diff --git a/core/src/job/mod.rs b/core/src/job/mod.rs index d492639a4..dec1ff267 100644 --- a/core/src/job/mod.rs +++ b/core/src/job/mod.rs @@ -1,5 +1,5 @@ use crate::{ - location::{indexer::IndexerError, LocationError}, + location::{indexer::IndexerError, LocationError, LocationManagerError}, object::{identifier_job::IdentifierJobError, preview::ThumbnailError}, }; @@ -45,6 +45,8 @@ pub enum JobError { MissingJobDataState(Uuid, String), #[error("missing some job data: '{value}'")] MissingData { value: String }, + #[error("Location manager error: {0}")] + LocationManager(#[from] LocationManagerError), // Specific job errors #[error("Indexer error: {0}")] diff --git a/core/src/lib.rs b/core/src/lib.rs index d7d532797..5cdda5422 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -77,6 +77,11 @@ impl Node { .parse() .expect("Error invalid tracing directive!"), ) + .add_directive( + "sd_core::location::manager=info" + .parse() + .expect("Error invalid tracing directive!"), + ) .add_directive( "sd_core_mobile=debug" .parse() diff --git a/core/src/location/manager/helpers.rs b/core/src/location/manager/helpers.rs index 867490ec9..4e613133d 100644 --- a/core/src/location/manager/helpers.rs +++ b/core/src/location/manager/helpers.rs @@ -1,16 +1,16 @@ use crate::{library::LibraryContext, prisma::location}; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, path::{Path, PathBuf}, time::Duration, }; -use tokio::{fs, io::ErrorKind, time::sleep}; +use tokio::{fs, io::ErrorKind, sync::oneshot, time::sleep}; use tracing::{error, warn}; use uuid::Uuid; -use super::{watcher::LocationWatcher, LocationId}; +use super::{watcher::LocationWatcher, LocationId, LocationManagerError}; type LibraryId = Uuid; type LocationAndLibraryKey = (LocationId, LibraryId); @@ -163,3 +163,176 @@ pub(super) fn subtract_location_path( None } } + +pub(super) async fn handle_remove_location_request( + location_id: LocationId, + library_ctx: LibraryContext, + response_tx: oneshot::Sender>, + forced_unwatch: &mut HashSet, + locations_watched: &mut HashMap, + locations_unwatched: &mut HashMap, + to_remove: &mut HashSet, +) { + let key = (location_id, library_ctx.id); + if let Some(location) = get_location(location_id, &library_ctx).await { + if let Some(ref local_path_str) = location.local_path.clone() { + unwatch_location( + location, + library_ctx.id, + local_path_str, + locations_watched, + locations_unwatched, + ); + locations_unwatched.remove(&key); + forced_unwatch.remove(&key); + } else { + drop_location( + location_id, + library_ctx.id, + "Dropping location from location manager, because we don't have a `local_path` anymore", + locations_watched, + locations_unwatched + ); + } + } else { + drop_location( + location_id, + library_ctx.id, + "Removing location from manager, as we failed to fetch from db", + locations_watched, + locations_unwatched, + ); + } + + // Marking location as removed, so we don't try to check it when the time comes + to_remove.insert(key); + + let _ = response_tx.send(Ok(())); // ignore errors, we handle errors on receiver +} + +pub(super) async fn handle_stop_watcher_request( + location_id: LocationId, + library_ctx: LibraryContext, + response_tx: oneshot::Sender>, + forced_unwatch: &mut HashSet, + locations_watched: &mut HashMap, + locations_unwatched: &mut HashMap, +) { + async fn inner( + location_id: LocationId, + library_ctx: LibraryContext, + forced_unwatch: &mut HashSet, + locations_watched: &mut HashMap, + locations_unwatched: &mut HashMap, + ) -> Result<(), LocationManagerError> { + let key = (location_id, library_ctx.id); + if !forced_unwatch.contains(&key) && locations_watched.contains_key(&key) { + get_location(location_id, &library_ctx) + .await + .ok_or_else(|| LocationManagerError::FailedToStopOrReinitWatcher { + reason: String::from("failed to fetch location from db"), + }) + .map(|location| { + location + .local_path + .clone() + .ok_or(LocationManagerError::LocationMissingLocalPath(location_id)) + .map(|local_path_str| { + unwatch_location( + location, + library_ctx.id, + local_path_str, + locations_watched, + locations_unwatched, + ); + forced_unwatch.insert(key); + }) + })? + } else { + Ok(()) + } + } + + let _ = response_tx.send( + inner( + location_id, + library_ctx, + forced_unwatch, + locations_watched, + locations_unwatched, + ) + .await, + ); // ignore errors, we handle errors on receiver +} + +pub(super) async fn handle_reinit_watcher_request( + location_id: LocationId, + library_ctx: LibraryContext, + response_tx: oneshot::Sender>, + forced_unwatch: &mut HashSet, + locations_watched: &mut HashMap, + locations_unwatched: &mut HashMap, +) { + async fn inner( + location_id: LocationId, + library_ctx: LibraryContext, + forced_unwatch: &mut HashSet, + locations_watched: &mut HashMap, + locations_unwatched: &mut HashMap, + ) -> Result<(), LocationManagerError> { + let key = (location_id, library_ctx.id); + if forced_unwatch.contains(&key) && locations_unwatched.contains_key(&key) { + get_location(location_id, &library_ctx) + .await + .ok_or_else(|| LocationManagerError::FailedToStopOrReinitWatcher { + reason: String::from("failed to fetch location from db"), + }) + .map(|location| { + location + .local_path + .clone() + .ok_or(LocationManagerError::LocationMissingLocalPath(location_id)) + .map(|local_path_str| { + watch_location( + location, + library_ctx.id, + local_path_str, + locations_watched, + locations_unwatched, + ); + forced_unwatch.remove(&key); + }) + })? + } else { + Ok(()) + } + } + + let _ = response_tx.send( + inner( + location_id, + library_ctx, + forced_unwatch, + locations_watched, + locations_unwatched, + ) + .await, + ); // ignore errors, we handle errors on receiver +} + +pub(super) fn handle_ignore_path_request( + location_id: LocationId, + library_ctx: LibraryContext, + path: PathBuf, + ignore: bool, + response_tx: oneshot::Sender>, + locations_watched: &HashMap, +) { + let _ = response_tx.send( + if let Some(watcher) = locations_watched.get(&(location_id, library_ctx.id)) { + watcher.ignore_path(path, ignore) + } else { + Ok(()) + }, + ); // ignore errors, we handle errors on receiver +} diff --git a/core/src/location/manager/mod.rs b/core/src/location/manager/mod.rs index cc72c66e8..58e617f10 100644 --- a/core/src/location/manager/mod.rs +++ b/core/src/location/manager/mod.rs @@ -1,13 +1,20 @@ use crate::library::LibraryContext; -use std::{path::PathBuf, sync::Arc}; - -use thiserror::Error; -use tokio::{ - io, - sync::{mpsc, oneshot}, +use std::{ + path::{Path, PathBuf}, + sync::Arc, }; -use tracing::{debug, error}; + +use futures::executor::block_on; +use thiserror::Error; +use tokio::{io, sync::oneshot}; +use tracing::error; + +#[cfg(feature = "location-watcher")] +use tokio::sync::mpsc; + +#[cfg(feature = "location-watcher")] +use tracing::debug; #[cfg(feature = "location-watcher")] mod watcher; @@ -17,16 +24,53 @@ mod helpers; pub type LocationId = i32; -type ManagerMessage = ( - LocationId, - LibraryContext, - oneshot::Sender>, -); +#[derive(Clone, Copy, Debug)] +#[allow(dead_code)] +enum ManagementMessageAction { + Add, + Remove, +} + +#[derive(Debug)] +#[allow(dead_code)] +pub struct LocationManagementMessage { + location_id: LocationId, + library_ctx: LibraryContext, + action: ManagementMessageAction, + response_tx: oneshot::Sender>, +} + +#[derive(Debug)] +#[allow(dead_code)] +enum WatcherManagementMessageAction { + Stop, + Reinit, + IgnoreEventsForPath { path: PathBuf, ignore: bool }, +} + +#[derive(Debug)] +#[allow(dead_code)] +pub struct WatcherManagementMessage { + location_id: LocationId, + library_ctx: LibraryContext, + action: WatcherManagementMessageAction, + response_tx: oneshot::Sender>, +} #[derive(Error, Debug)] pub enum LocationManagerError { - #[error("Unable to send location id to be checked by actor: (error: {0})")] - ActorSendLocationError(#[from] mpsc::error::SendError), + #[cfg(feature = "location-watcher")] + #[error("Unable to send location management message to location manager actor: (error: {0})")] + ActorSendLocationError(#[from] mpsc::error::SendError), + + #[cfg(feature = "location-watcher")] + #[error("Unable to send path to be ignored by watcher actor: (error: {0})")] + ActorIgnorePathError(#[from] mpsc::error::SendError), + + #[cfg(feature = "location-watcher")] + #[error("Unable to watcher management message to watcher manager actor: (error: {0})")] + ActorIgnorePathMessageError(#[from] mpsc::error::SendError), + #[error("Unable to receive actor response: (error: {0})")] ActorResponseError(#[from] oneshot::error::RecvError), @@ -34,6 +78,9 @@ pub enum LocationManagerError { #[error("Watcher error: (error: {0})")] WatcherError(#[from] notify::Error), + #[error("Failed to stop or reinit a watcher: {reason}")] + FailedToStopOrReinitWatcher { reason: String }, + #[error("Location missing local path: ")] LocationMissingLocalPath(LocationId), #[error("Tried to update a non-existing file: ")] @@ -48,35 +95,98 @@ pub enum LocationManagerError { #[derive(Debug)] pub struct LocationManager { - add_locations_tx: mpsc::Sender, - remove_locations_tx: mpsc::Sender, + #[cfg(feature = "location-watcher")] + location_management_tx: mpsc::Sender, + #[cfg(feature = "location-watcher")] + watcher_management_tx: mpsc::Sender, stop_tx: Option>, } impl LocationManager { - #[allow(unused)] pub fn new() -> Arc { - let (add_locations_tx, add_locations_rx) = mpsc::channel(128); - let (remove_locations_tx, remove_locations_rx) = mpsc::channel(128); - let (stop_tx, stop_rx) = oneshot::channel(); - #[cfg(feature = "location-watcher")] - tokio::spawn(Self::run_locations_checker( - add_locations_rx, - remove_locations_rx, - stop_rx, - )); + { + let (location_management_tx, location_management_rx) = mpsc::channel(128); + let (watcher_management_tx, watcher_management_rx) = mpsc::channel(128); + let (stop_tx, stop_rx) = oneshot::channel(); + + #[cfg(feature = "location-watcher")] + tokio::spawn(Self::run_locations_checker( + location_management_rx, + watcher_management_rx, + stop_rx, + )); + + debug!("Location manager initialized"); + + Arc::new(Self { + location_management_tx, + watcher_management_tx, + stop_tx: Some(stop_tx), + }) + } #[cfg(not(feature = "location-watcher"))] - tracing::warn!("Location watcher is disabled, locations will not be checked"); + { + tracing::warn!("Location watcher is disabled, locations will not be checked"); + Arc::new(Self { stop_tx: None }) + } + } - debug!("Location manager initialized"); + #[inline] + #[allow(unused_variables)] + async fn location_management_message( + &self, + location_id: LocationId, + library_ctx: LibraryContext, + action: ManagementMessageAction, + ) -> Result<(), LocationManagerError> { + #[cfg(feature = "location-watcher")] + { + let (tx, rx) = oneshot::channel(); - Arc::new(Self { - add_locations_tx, - remove_locations_tx, - stop_tx: Some(stop_tx), - }) + self.location_management_tx + .send(LocationManagementMessage { + location_id, + library_ctx, + action, + response_tx: tx, + }) + .await?; + + rx.await? + } + + #[cfg(not(feature = "location-watcher"))] + Ok(()) + } + + #[inline] + #[allow(unused_variables)] + async fn watcher_management_message( + &self, + location_id: LocationId, + library_ctx: LibraryContext, + action: WatcherManagementMessageAction, + ) -> Result<(), LocationManagerError> { + #[cfg(feature = "location-watcher")] + { + let (tx, rx) = oneshot::channel(); + + self.watcher_management_tx + .send(WatcherManagementMessage { + location_id, + library_ctx, + action, + response_tx: tx, + }) + .await?; + + rx.await? + } + + #[cfg(not(feature = "location-watcher"))] + Ok(()) } pub async fn add( @@ -84,17 +194,8 @@ impl LocationManager { location_id: LocationId, library_ctx: LibraryContext, ) -> Result<(), LocationManagerError> { - if cfg!(feature = "location-watcher") { - let (tx, rx) = oneshot::channel(); - - self.add_locations_tx - .send((location_id, library_ctx, tx)) - .await?; - - rx.await? - } else { - Ok(()) - } + self.location_management_message(location_id, library_ctx, ManagementMessageAction::Add) + .await } pub async fn remove( @@ -102,23 +203,80 @@ impl LocationManager { location_id: LocationId, library_ctx: LibraryContext, ) -> Result<(), LocationManagerError> { - if cfg!(feature = "location-watcher") { - let (tx, rx) = oneshot::channel(); + self.location_management_message(location_id, library_ctx, ManagementMessageAction::Remove) + .await + } - self.remove_locations_tx - .send((location_id, library_ctx, tx)) - .await?; + pub async fn stop_watcher( + &self, + location_id: LocationId, + library_ctx: LibraryContext, + ) -> Result<(), LocationManagerError> { + self.watcher_management_message( + location_id, + library_ctx, + WatcherManagementMessageAction::Stop, + ) + .await + } - rx.await? - } else { - Ok(()) - } + pub async fn reinit_watcher( + &self, + location_id: LocationId, + library_ctx: LibraryContext, + ) -> Result<(), LocationManagerError> { + self.watcher_management_message( + location_id, + library_ctx, + WatcherManagementMessageAction::Reinit, + ) + .await + } + + pub async fn temporary_stop( + &self, + location_id: LocationId, + library_ctx: LibraryContext, + ) -> Result { + self.stop_watcher(location_id, library_ctx.clone()).await?; + + Ok(StopWatcherGuard { + location_id, + library_ctx: Some(library_ctx), + manager: self, + }) + } + + pub async fn temporary_ignore_events_for_path( + &self, + location_id: LocationId, + library_ctx: LibraryContext, + path: impl AsRef, + ) -> Result { + let path = path.as_ref().to_path_buf(); + + self.watcher_management_message( + location_id, + library_ctx.clone(), + WatcherManagementMessageAction::IgnoreEventsForPath { + path: path.clone(), + ignore: true, + }, + ) + .await?; + + Ok(IgnoreEventsForPathGuard { + location_id, + library_ctx: Some(library_ctx), + manager: self, + path: Some(path), + }) } #[cfg(feature = "location-watcher")] async fn run_locations_checker( - mut add_locations_rx: mpsc::Receiver, - mut remove_locations_rx: mpsc::Receiver, + mut location_management_rx: mpsc::Receiver, + mut watcher_management_rx: mpsc::Receiver, mut stop_rx: oneshot::Receiver<()>, ) -> Result<(), LocationManagerError> { use std::collections::{HashMap, HashSet}; @@ -128,8 +286,9 @@ impl LocationManager { use tracing::{info, warn}; use helpers::{ - check_online, drop_location, get_location, location_check_sleep, unwatch_location, - watch_location, + check_online, drop_location, get_location, handle_ignore_path_request, + handle_reinit_watcher_request, handle_remove_location_request, + handle_stop_watcher_request, location_check_sleep, unwatch_location, watch_location, }; use watcher::LocationWatcher; @@ -137,95 +296,133 @@ impl LocationManager { let mut to_remove = HashSet::new(); let mut locations_watched = HashMap::new(); let mut locations_unwatched = HashMap::new(); + let mut forced_unwatch = HashSet::new(); loop { select! { - // To add a new location - Some((location_id, library_ctx, response_tx)) = add_locations_rx.recv() => { - if let Some(location) = get_location(location_id, &library_ctx).await { - let is_online = check_online(&location, &library_ctx).await; - let _ = response_tx.send( - LocationWatcher::new(location, library_ctx.clone()) - .await - .map(|mut watcher| { - if is_online { - watcher.watch(); - locations_watched.insert( - (location_id, library_ctx.id), - watcher - ); - } else { - locations_unwatched.insert( - (location_id, library_ctx.id), - watcher - ); - } + // Location management messages + Some(LocationManagementMessage{ + location_id, + library_ctx, + action, + response_tx + }) = location_management_rx.recv() => { + match action { - to_check_futures.push( - location_check_sleep(location_id, library_ctx) - ); - } - ) - ); // ignore errors, we handle errors on receiver - } else { - warn!( - "Location not found in database to be watched: {}", - location_id - ); + // To add a new location + ManagementMessageAction::Add => { + if let Some(location) = get_location(location_id, &library_ctx).await { + let is_online = check_online(&location, &library_ctx).await; + let _ = response_tx.send( + LocationWatcher::new(location, library_ctx.clone()) + .await + .map(|mut watcher| { + if is_online { + watcher.watch(); + locations_watched.insert( + (location_id, library_ctx.id), + watcher + ); + } else { + locations_unwatched.insert( + (location_id, library_ctx.id), + watcher + ); + } + + to_check_futures.push( + location_check_sleep(location_id, library_ctx) + ); + } + ) + ); // ignore errors, we handle errors on receiver + } else { + warn!( + "Location not found in database to be watched: {}", + location_id + ); + } + }, + + // To remove an location + ManagementMessageAction::Remove => { + handle_remove_location_request( + location_id, + library_ctx, + response_tx, + &mut forced_unwatch, + &mut locations_watched, + &mut locations_unwatched, + &mut to_remove, + ).await; + }, } } - // To remove an location - Some((location_id, library_ctx, response_tx)) = remove_locations_rx.recv() => { - if let Some(location) = get_location(location_id, &library_ctx).await { - if let Some(ref local_path_str) = location.local_path.clone() { - unwatch_location( - location, - library_ctx.id, - local_path_str, + // Watcher management messages + Some(WatcherManagementMessage{ + location_id, + library_ctx, + action, + response_tx, + }) = watcher_management_rx.recv() => { + match action { + // To stop a watcher + WatcherManagementMessageAction::Stop => { + handle_stop_watcher_request( + location_id, + library_ctx, + response_tx, + &mut forced_unwatch, &mut locations_watched, &mut locations_unwatched, - ); - locations_unwatched.remove(&(location_id, library_ctx.id)); - } else { - drop_location( + ).await; + }, + + // To reinit a stopped watcher + WatcherManagementMessageAction::Reinit => { + handle_reinit_watcher_request( location_id, - library_ctx.id, - "Dropping location from location manager, because we don't have a `local_path` anymore", + library_ctx, + response_tx, + &mut forced_unwatch, &mut locations_watched, - &mut locations_unwatched + &mut locations_unwatched, + ).await; + }, + + // To ignore or not events for a path + WatcherManagementMessageAction::IgnoreEventsForPath { path, ignore } => { + handle_ignore_path_request( + location_id, + library_ctx, + path, + ignore, + response_tx, + &locations_watched, ); - } - } else { - drop_location( - location_id, - library_ctx.id, - "Removing location from manager, as we failed to fetch from db", - &mut locations_watched, - &mut locations_unwatched - ); + }, } - - // Marking location as removed, so we don't try to check it when the time comes - to_remove.insert((location_id, library_ctx.id)); - - let _ = response_tx.send(Ok(())); // ignore errors, we handle errors on receiver } // Periodically checking locations Some((location_id, library_ctx)) = to_check_futures.next() => { - if to_remove.contains(&(location_id, library_ctx.id)) { + let key = (location_id, library_ctx.id); + + if to_remove.contains(&key) { // The time to check came for an already removed library, so we just ignore it - to_remove.remove(&(location_id, library_ctx.id)); + to_remove.remove(&key); } else if let Some(location) = get_location(location_id, &library_ctx).await { if let Some(ref local_path_str) = location.local_path.clone() { - if check_online(&location, &library_ctx).await { + if check_online(&location, &library_ctx).await + && !forced_unwatch.contains(&key) + { watch_location( location, library_ctx.id, local_path_str, &mut locations_watched, - &mut locations_unwatched + &mut locations_unwatched, ); } else { unwatch_location( @@ -233,7 +430,7 @@ impl LocationManager { library_ctx.id, local_path_str, &mut locations_watched, - &mut locations_unwatched + &mut locations_unwatched, ); } to_check_futures.push(location_check_sleep(location_id, library_ctx)); @@ -241,10 +438,12 @@ impl LocationManager { drop_location( location_id, library_ctx.id, - "Dropping location from location manager, because we don't have a `local_path` anymore", + "Dropping location from location manager, because \ + we don't have a `local_path` anymore", &mut locations_watched, &mut locations_unwatched ); + forced_unwatch.remove(&key); } } else { drop_location( @@ -252,8 +451,9 @@ impl LocationManager { library_ctx.id, "Removing location from manager, as we failed to fetch from db", &mut locations_watched, - &mut locations_unwatched + &mut locations_unwatched, ); + forced_unwatch.remove(&key); } } @@ -277,3 +477,50 @@ impl Drop for LocationManager { } } } + +#[must_use = "this `StopWatcherGuard` must be held for some time, so the watcher is stopped"] +pub struct StopWatcherGuard<'m> { + manager: &'m LocationManager, + location_id: LocationId, + library_ctx: Option, +} + +impl Drop for StopWatcherGuard<'_> { + fn drop(&mut self) { + if cfg!(feature = "location-watcher") { + // FIXME: change this Drop to async drop in the future + if let Err(e) = block_on( + self.manager + .reinit_watcher(self.location_id, self.library_ctx.take().unwrap()), + ) { + error!("Failed to reinit watcher on stop watcher guard drop: {e}"); + } + } + } +} + +#[must_use = "this `IgnoreEventsForPathGuard` must be held for some time, so the watcher can ignore events for the desired path"] +pub struct IgnoreEventsForPathGuard<'m> { + manager: &'m LocationManager, + path: Option, + location_id: LocationId, + library_ctx: Option, +} + +impl Drop for IgnoreEventsForPathGuard<'_> { + fn drop(&mut self) { + if cfg!(feature = "location-watcher") { + // FIXME: change this Drop to async drop in the future + if let Err(e) = block_on(self.manager.watcher_management_message( + self.location_id, + self.library_ctx.take().unwrap(), + WatcherManagementMessageAction::IgnoreEventsForPath { + path: self.path.take().unwrap(), + ignore: false, + }, + )) { + error!("Failed to un-ignore path on watcher guard drop: {e}"); + } + } + } +} diff --git a/core/src/location/manager/watcher/mod.rs b/core/src/location/manager/watcher/mod.rs index 8c071978a..706874528 100644 --- a/core/src/location/manager/watcher/mod.rs +++ b/core/src/location/manager/watcher/mod.rs @@ -3,7 +3,10 @@ use crate::{ prisma::{file_path, location}, }; -use std::path::{Path, PathBuf}; +use std::{ + collections::HashSet, + path::{Path, PathBuf}, +}; use async_trait::async_trait; use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher}; @@ -39,6 +42,8 @@ type Handler = windows::WindowsEventHandler; file_path::include!(file_path_with_object { object }); +pub(super) type IgnorePath = (PathBuf, bool); + #[async_trait] trait EventHandler { fn new() -> Self @@ -58,6 +63,7 @@ pub(super) struct LocationWatcher { location: location::Data, path: PathBuf, watcher: RecommendedWatcher, + ignore_path_tx: mpsc::UnboundedSender, handle: Option>, stop_tx: Option>, } @@ -68,6 +74,7 @@ impl LocationWatcher { library_ctx: LibraryContext, ) -> Result { let (events_tx, events_rx) = mpsc::unbounded_channel(); + let (ignore_path_tx, ignore_path_rx) = mpsc::unbounded_channel(); let (stop_tx, stop_rx) = oneshot::channel(); let watcher = RecommendedWatcher::new( @@ -89,12 +96,6 @@ impl LocationWatcher { Config::default(), )?; - let handle = tokio::spawn(Self::handle_watch_events( - location.id, - library_ctx, - events_rx, - stop_rx, - )); let path = PathBuf::from( location .local_path @@ -102,10 +103,19 @@ impl LocationWatcher { .ok_or(LocationManagerError::LocationMissingLocalPath(location.id))?, ); + let handle = tokio::spawn(Self::handle_watch_events( + location.id, + library_ctx, + events_rx, + ignore_path_rx, + stop_rx, + )); + Ok(Self { location, path, watcher, + ignore_path_tx, handle: Some(handle), stop_tx: Some(stop_tx), }) @@ -115,10 +125,13 @@ impl LocationWatcher { location_id: LocationId, library_ctx: LibraryContext, mut events_rx: mpsc::UnboundedReceiver>, + mut ignore_path_rx: mpsc::UnboundedReceiver, mut stop_rx: oneshot::Receiver<()>, ) { let mut event_handler = Handler::new(); + let mut paths_to_ignore = HashSet::new(); + loop { select! { Some(event) = events_rx.recv() => { @@ -128,7 +141,8 @@ impl LocationWatcher { location_id, event, &mut event_handler, - &library_ctx + &library_ctx, + &paths_to_ignore, ).await { error!("Failed to handle location file system event: \ ", @@ -140,6 +154,15 @@ impl LocationWatcher { } } } + + Some((path, ignore)) = ignore_path_rx.recv() => { + if ignore { + paths_to_ignore.insert(path); + } else { + paths_to_ignore.remove(&path); + } + } + _ = &mut stop_rx => { debug!("Stop Location Manager event handler for location: ", location_id); break @@ -153,8 +176,9 @@ impl LocationWatcher { event: Event, event_handler: &mut impl EventHandler, library_ctx: &LibraryContext, + ignore_paths: &HashSet, ) -> Result<(), LocationManagerError> { - if check_event(&event) { + if check_event(&event, ignore_paths) { if let Some(location) = fetch_location(library_ctx, location_id) .include(indexer_job_location::include()) .exec() @@ -175,6 +199,14 @@ impl LocationWatcher { Ok(()) } + pub(super) fn ignore_path( + &self, + path: PathBuf, + ignore: bool, + ) -> Result<(), LocationManagerError> { + self.ignore_path_tx.send((path, ignore)).map_err(Into::into) + } + pub(super) fn check_path(&self, path: impl AsRef) -> bool { self.path == path.as_ref() } diff --git a/core/src/location/manager/watcher/utils.rs b/core/src/location/manager/watcher/utils.rs index 415a5b372..295d5d036 100644 --- a/core/src/location/manager/watcher/utils.rs +++ b/core/src/location/manager/watcher/utils.rs @@ -18,6 +18,7 @@ use crate::{ }; use std::{ + collections::HashSet, path::{Path, PathBuf}, str::FromStr, }; @@ -46,12 +47,13 @@ pub(super) fn check_location_online(location: &indexer_job_location::Data) -> bo } } -pub(super) fn check_event(event: &Event) -> bool { +pub(super) fn check_event(event: &Event, ignore_paths: &HashSet) -> bool { // if first path includes .DS_Store, ignore if event.paths.iter().any(|p| { p.to_str() .expect("Found non-UTF-8 path") .contains(".DS_Store") + || ignore_paths.contains(p) }) { return false; } diff --git a/core/src/object/fs/encrypt.rs b/core/src/object/fs/encrypt.rs index 09550740e..6e98a4261 100644 --- a/core/src/object/fs/encrypt.rs +++ b/core/src/object/fs/encrypt.rs @@ -1,9 +1,4 @@ -use std::{ - collections::VecDeque, - fs::{self, File}, - io::Read, - path::PathBuf, -}; +use std::{collections::VecDeque, fs::File, io::Read, path::PathBuf}; use tokio::task; @@ -64,8 +59,8 @@ const JOB_NAME: &str = "file_encryptor"; #[async_trait::async_trait] impl StatefulJob for FileEncryptorJob { - type Data = FileEncryptorJobState; type Init = FileEncryptorJobInit; + type Data = FileEncryptorJobState; type Step = FileEncryptorJobStep; fn name(&self) -> &'static str { @@ -136,8 +131,18 @@ impl StatefulJob for FileEncryptorJob { Ok, )?; - let mut reader = File::open(info.obj_path.clone())?; - let mut writer = File::create(output_path)?; + let _guard = ctx + .library_ctx + .location_manager() + .temporary_ignore_events_for_path( + state.init.location_id, + ctx.library_ctx.clone(), + &output_path, + ) + .await?; + + let mut reader = task::block_in_place(|| File::open(&info.obj_path))?; + let mut writer = task::block_in_place(|| File::create(output_path))?; let master_key = generate_master_key(); @@ -202,7 +207,7 @@ impl StatefulJob for FileEncryptorJob { .join("thumbnails") .join(object.cas_id + ".webp"); - if fs::metadata(pvm_path.clone()).is_ok() { + if tokio::fs::metadata(&pvm_path).await.is_ok() { let mut pvm_bytes = Vec::new(); task::block_in_place(|| { let mut pvm_file = File::open(pvm_path)?;