This commit is contained in:
Brendan Allan 2023-01-20 12:21:23 +08:00
commit ff5b80cc1a
7 changed files with 614 additions and 148 deletions

View file

@ -1,5 +1,5 @@
use crate::{
location::{indexer::IndexerError, LocationError},
location::{indexer::IndexerError, LocationError, LocationManagerError},
object::{identifier_job::IdentifierJobError, preview::ThumbnailError},
};
@ -45,6 +45,8 @@ pub enum JobError {
MissingJobDataState(Uuid, String),
#[error("missing some job data: '{value}'")]
MissingData { value: String },
#[error("Location manager error: {0}")]
LocationManager(#[from] LocationManagerError),
// Specific job errors
#[error("Indexer error: {0}")]

View file

@ -77,6 +77,11 @@ impl Node {
.parse()
.expect("Error invalid tracing directive!"),
)
.add_directive(
"sd_core::location::manager=info"
.parse()
.expect("Error invalid tracing directive!"),
)
.add_directive(
"sd_core_mobile=debug"
.parse()

View file

@ -1,16 +1,16 @@
use crate::{library::LibraryContext, prisma::location};
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
path::{Path, PathBuf},
time::Duration,
};
use tokio::{fs, io::ErrorKind, time::sleep};
use tokio::{fs, io::ErrorKind, sync::oneshot, time::sleep};
use tracing::{error, warn};
use uuid::Uuid;
use super::{watcher::LocationWatcher, LocationId};
use super::{watcher::LocationWatcher, LocationId, LocationManagerError};
type LibraryId = Uuid;
type LocationAndLibraryKey = (LocationId, LibraryId);
@ -163,3 +163,176 @@ pub(super) fn subtract_location_path(
None
}
}
pub(super) async fn handle_remove_location_request(
location_id: LocationId,
library_ctx: LibraryContext,
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
to_remove: &mut HashSet<LocationAndLibraryKey>,
) {
let key = (location_id, library_ctx.id);
if let Some(location) = get_location(location_id, &library_ctx).await {
if let Some(ref local_path_str) = location.local_path.clone() {
unwatch_location(
location,
library_ctx.id,
local_path_str,
locations_watched,
locations_unwatched,
);
locations_unwatched.remove(&key);
forced_unwatch.remove(&key);
} else {
drop_location(
location_id,
library_ctx.id,
"Dropping location from location manager, because we don't have a `local_path` anymore",
locations_watched,
locations_unwatched
);
}
} else {
drop_location(
location_id,
library_ctx.id,
"Removing location from manager, as we failed to fetch from db",
locations_watched,
locations_unwatched,
);
}
// Marking location as removed, so we don't try to check it when the time comes
to_remove.insert(key);
let _ = response_tx.send(Ok(())); // ignore errors, we handle errors on receiver
}
pub(super) async fn handle_stop_watcher_request(
location_id: LocationId,
library_ctx: LibraryContext,
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
) {
async fn inner(
location_id: LocationId,
library_ctx: LibraryContext,
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
) -> Result<(), LocationManagerError> {
let key = (location_id, library_ctx.id);
if !forced_unwatch.contains(&key) && locations_watched.contains_key(&key) {
get_location(location_id, &library_ctx)
.await
.ok_or_else(|| LocationManagerError::FailedToStopOrReinitWatcher {
reason: String::from("failed to fetch location from db"),
})
.map(|location| {
location
.local_path
.clone()
.ok_or(LocationManagerError::LocationMissingLocalPath(location_id))
.map(|local_path_str| {
unwatch_location(
location,
library_ctx.id,
local_path_str,
locations_watched,
locations_unwatched,
);
forced_unwatch.insert(key);
})
})?
} else {
Ok(())
}
}
let _ = response_tx.send(
inner(
location_id,
library_ctx,
forced_unwatch,
locations_watched,
locations_unwatched,
)
.await,
); // ignore errors, we handle errors on receiver
}
pub(super) async fn handle_reinit_watcher_request(
location_id: LocationId,
library_ctx: LibraryContext,
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
) {
async fn inner(
location_id: LocationId,
library_ctx: LibraryContext,
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
) -> Result<(), LocationManagerError> {
let key = (location_id, library_ctx.id);
if forced_unwatch.contains(&key) && locations_unwatched.contains_key(&key) {
get_location(location_id, &library_ctx)
.await
.ok_or_else(|| LocationManagerError::FailedToStopOrReinitWatcher {
reason: String::from("failed to fetch location from db"),
})
.map(|location| {
location
.local_path
.clone()
.ok_or(LocationManagerError::LocationMissingLocalPath(location_id))
.map(|local_path_str| {
watch_location(
location,
library_ctx.id,
local_path_str,
locations_watched,
locations_unwatched,
);
forced_unwatch.remove(&key);
})
})?
} else {
Ok(())
}
}
let _ = response_tx.send(
inner(
location_id,
library_ctx,
forced_unwatch,
locations_watched,
locations_unwatched,
)
.await,
); // ignore errors, we handle errors on receiver
}
pub(super) fn handle_ignore_path_request(
location_id: LocationId,
library_ctx: LibraryContext,
path: PathBuf,
ignore: bool,
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
locations_watched: &HashMap<LocationAndLibraryKey, LocationWatcher>,
) {
let _ = response_tx.send(
if let Some(watcher) = locations_watched.get(&(location_id, library_ctx.id)) {
watcher.ignore_path(path, ignore)
} else {
Ok(())
},
); // ignore errors, we handle errors on receiver
}

View file

@ -1,13 +1,20 @@
use crate::library::LibraryContext;
use std::{path::PathBuf, sync::Arc};
use thiserror::Error;
use tokio::{
io,
sync::{mpsc, oneshot},
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use tracing::{debug, error};
use futures::executor::block_on;
use thiserror::Error;
use tokio::{io, sync::oneshot};
use tracing::error;
#[cfg(feature = "location-watcher")]
use tokio::sync::mpsc;
#[cfg(feature = "location-watcher")]
use tracing::debug;
#[cfg(feature = "location-watcher")]
mod watcher;
@ -17,16 +24,53 @@ mod helpers;
pub type LocationId = i32;
type ManagerMessage = (
LocationId,
LibraryContext,
oneshot::Sender<Result<(), LocationManagerError>>,
);
#[derive(Clone, Copy, Debug)]
#[allow(dead_code)]
enum ManagementMessageAction {
Add,
Remove,
}
#[derive(Debug)]
#[allow(dead_code)]
pub struct LocationManagementMessage {
location_id: LocationId,
library_ctx: LibraryContext,
action: ManagementMessageAction,
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
}
#[derive(Debug)]
#[allow(dead_code)]
enum WatcherManagementMessageAction {
Stop,
Reinit,
IgnoreEventsForPath { path: PathBuf, ignore: bool },
}
#[derive(Debug)]
#[allow(dead_code)]
pub struct WatcherManagementMessage {
location_id: LocationId,
library_ctx: LibraryContext,
action: WatcherManagementMessageAction,
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
}
#[derive(Error, Debug)]
pub enum LocationManagerError {
#[error("Unable to send location id to be checked by actor: (error: {0})")]
ActorSendLocationError(#[from] mpsc::error::SendError<ManagerMessage>),
#[cfg(feature = "location-watcher")]
#[error("Unable to send location management message to location manager actor: (error: {0})")]
ActorSendLocationError(#[from] mpsc::error::SendError<LocationManagementMessage>),
#[cfg(feature = "location-watcher")]
#[error("Unable to send path to be ignored by watcher actor: (error: {0})")]
ActorIgnorePathError(#[from] mpsc::error::SendError<watcher::IgnorePath>),
#[cfg(feature = "location-watcher")]
#[error("Unable to watcher management message to watcher manager actor: (error: {0})")]
ActorIgnorePathMessageError(#[from] mpsc::error::SendError<WatcherManagementMessage>),
#[error("Unable to receive actor response: (error: {0})")]
ActorResponseError(#[from] oneshot::error::RecvError),
@ -34,6 +78,9 @@ pub enum LocationManagerError {
#[error("Watcher error: (error: {0})")]
WatcherError(#[from] notify::Error),
#[error("Failed to stop or reinit a watcher: {reason}")]
FailedToStopOrReinitWatcher { reason: String },
#[error("Location missing local path: <id='{0}'>")]
LocationMissingLocalPath(LocationId),
#[error("Tried to update a non-existing file: <path='{0}'>")]
@ -48,35 +95,98 @@ pub enum LocationManagerError {
#[derive(Debug)]
pub struct LocationManager {
add_locations_tx: mpsc::Sender<ManagerMessage>,
remove_locations_tx: mpsc::Sender<ManagerMessage>,
#[cfg(feature = "location-watcher")]
location_management_tx: mpsc::Sender<LocationManagementMessage>,
#[cfg(feature = "location-watcher")]
watcher_management_tx: mpsc::Sender<WatcherManagementMessage>,
stop_tx: Option<oneshot::Sender<()>>,
}
impl LocationManager {
#[allow(unused)]
pub fn new() -> Arc<Self> {
let (add_locations_tx, add_locations_rx) = mpsc::channel(128);
let (remove_locations_tx, remove_locations_rx) = mpsc::channel(128);
let (stop_tx, stop_rx) = oneshot::channel();
#[cfg(feature = "location-watcher")]
tokio::spawn(Self::run_locations_checker(
add_locations_rx,
remove_locations_rx,
stop_rx,
));
{
let (location_management_tx, location_management_rx) = mpsc::channel(128);
let (watcher_management_tx, watcher_management_rx) = mpsc::channel(128);
let (stop_tx, stop_rx) = oneshot::channel();
#[cfg(feature = "location-watcher")]
tokio::spawn(Self::run_locations_checker(
location_management_rx,
watcher_management_rx,
stop_rx,
));
debug!("Location manager initialized");
Arc::new(Self {
location_management_tx,
watcher_management_tx,
stop_tx: Some(stop_tx),
})
}
#[cfg(not(feature = "location-watcher"))]
tracing::warn!("Location watcher is disabled, locations will not be checked");
{
tracing::warn!("Location watcher is disabled, locations will not be checked");
Arc::new(Self { stop_tx: None })
}
}
debug!("Location manager initialized");
#[inline]
#[allow(unused_variables)]
async fn location_management_message(
&self,
location_id: LocationId,
library_ctx: LibraryContext,
action: ManagementMessageAction,
) -> Result<(), LocationManagerError> {
#[cfg(feature = "location-watcher")]
{
let (tx, rx) = oneshot::channel();
Arc::new(Self {
add_locations_tx,
remove_locations_tx,
stop_tx: Some(stop_tx),
})
self.location_management_tx
.send(LocationManagementMessage {
location_id,
library_ctx,
action,
response_tx: tx,
})
.await?;
rx.await?
}
#[cfg(not(feature = "location-watcher"))]
Ok(())
}
#[inline]
#[allow(unused_variables)]
async fn watcher_management_message(
&self,
location_id: LocationId,
library_ctx: LibraryContext,
action: WatcherManagementMessageAction,
) -> Result<(), LocationManagerError> {
#[cfg(feature = "location-watcher")]
{
let (tx, rx) = oneshot::channel();
self.watcher_management_tx
.send(WatcherManagementMessage {
location_id,
library_ctx,
action,
response_tx: tx,
})
.await?;
rx.await?
}
#[cfg(not(feature = "location-watcher"))]
Ok(())
}
pub async fn add(
@ -84,17 +194,8 @@ impl LocationManager {
location_id: LocationId,
library_ctx: LibraryContext,
) -> Result<(), LocationManagerError> {
if cfg!(feature = "location-watcher") {
let (tx, rx) = oneshot::channel();
self.add_locations_tx
.send((location_id, library_ctx, tx))
.await?;
rx.await?
} else {
Ok(())
}
self.location_management_message(location_id, library_ctx, ManagementMessageAction::Add)
.await
}
pub async fn remove(
@ -102,23 +203,80 @@ impl LocationManager {
location_id: LocationId,
library_ctx: LibraryContext,
) -> Result<(), LocationManagerError> {
if cfg!(feature = "location-watcher") {
let (tx, rx) = oneshot::channel();
self.location_management_message(location_id, library_ctx, ManagementMessageAction::Remove)
.await
}
self.remove_locations_tx
.send((location_id, library_ctx, tx))
.await?;
pub async fn stop_watcher(
&self,
location_id: LocationId,
library_ctx: LibraryContext,
) -> Result<(), LocationManagerError> {
self.watcher_management_message(
location_id,
library_ctx,
WatcherManagementMessageAction::Stop,
)
.await
}
rx.await?
} else {
Ok(())
}
pub async fn reinit_watcher(
&self,
location_id: LocationId,
library_ctx: LibraryContext,
) -> Result<(), LocationManagerError> {
self.watcher_management_message(
location_id,
library_ctx,
WatcherManagementMessageAction::Reinit,
)
.await
}
pub async fn temporary_stop(
&self,
location_id: LocationId,
library_ctx: LibraryContext,
) -> Result<StopWatcherGuard, LocationManagerError> {
self.stop_watcher(location_id, library_ctx.clone()).await?;
Ok(StopWatcherGuard {
location_id,
library_ctx: Some(library_ctx),
manager: self,
})
}
pub async fn temporary_ignore_events_for_path(
&self,
location_id: LocationId,
library_ctx: LibraryContext,
path: impl AsRef<Path>,
) -> Result<IgnoreEventsForPathGuard, LocationManagerError> {
let path = path.as_ref().to_path_buf();
self.watcher_management_message(
location_id,
library_ctx.clone(),
WatcherManagementMessageAction::IgnoreEventsForPath {
path: path.clone(),
ignore: true,
},
)
.await?;
Ok(IgnoreEventsForPathGuard {
location_id,
library_ctx: Some(library_ctx),
manager: self,
path: Some(path),
})
}
#[cfg(feature = "location-watcher")]
async fn run_locations_checker(
mut add_locations_rx: mpsc::Receiver<ManagerMessage>,
mut remove_locations_rx: mpsc::Receiver<ManagerMessage>,
mut location_management_rx: mpsc::Receiver<LocationManagementMessage>,
mut watcher_management_rx: mpsc::Receiver<WatcherManagementMessage>,
mut stop_rx: oneshot::Receiver<()>,
) -> Result<(), LocationManagerError> {
use std::collections::{HashMap, HashSet};
@ -128,8 +286,9 @@ impl LocationManager {
use tracing::{info, warn};
use helpers::{
check_online, drop_location, get_location, location_check_sleep, unwatch_location,
watch_location,
check_online, drop_location, get_location, handle_ignore_path_request,
handle_reinit_watcher_request, handle_remove_location_request,
handle_stop_watcher_request, location_check_sleep, unwatch_location, watch_location,
};
use watcher::LocationWatcher;
@ -137,95 +296,133 @@ impl LocationManager {
let mut to_remove = HashSet::new();
let mut locations_watched = HashMap::new();
let mut locations_unwatched = HashMap::new();
let mut forced_unwatch = HashSet::new();
loop {
select! {
// To add a new location
Some((location_id, library_ctx, response_tx)) = add_locations_rx.recv() => {
if let Some(location) = get_location(location_id, &library_ctx).await {
let is_online = check_online(&location, &library_ctx).await;
let _ = response_tx.send(
LocationWatcher::new(location, library_ctx.clone())
.await
.map(|mut watcher| {
if is_online {
watcher.watch();
locations_watched.insert(
(location_id, library_ctx.id),
watcher
);
} else {
locations_unwatched.insert(
(location_id, library_ctx.id),
watcher
);
}
// Location management messages
Some(LocationManagementMessage{
location_id,
library_ctx,
action,
response_tx
}) = location_management_rx.recv() => {
match action {
to_check_futures.push(
location_check_sleep(location_id, library_ctx)
);
}
)
); // ignore errors, we handle errors on receiver
} else {
warn!(
"Location not found in database to be watched: {}",
location_id
);
// To add a new location
ManagementMessageAction::Add => {
if let Some(location) = get_location(location_id, &library_ctx).await {
let is_online = check_online(&location, &library_ctx).await;
let _ = response_tx.send(
LocationWatcher::new(location, library_ctx.clone())
.await
.map(|mut watcher| {
if is_online {
watcher.watch();
locations_watched.insert(
(location_id, library_ctx.id),
watcher
);
} else {
locations_unwatched.insert(
(location_id, library_ctx.id),
watcher
);
}
to_check_futures.push(
location_check_sleep(location_id, library_ctx)
);
}
)
); // ignore errors, we handle errors on receiver
} else {
warn!(
"Location not found in database to be watched: {}",
location_id
);
}
},
// To remove an location
ManagementMessageAction::Remove => {
handle_remove_location_request(
location_id,
library_ctx,
response_tx,
&mut forced_unwatch,
&mut locations_watched,
&mut locations_unwatched,
&mut to_remove,
).await;
},
}
}
// To remove an location
Some((location_id, library_ctx, response_tx)) = remove_locations_rx.recv() => {
if let Some(location) = get_location(location_id, &library_ctx).await {
if let Some(ref local_path_str) = location.local_path.clone() {
unwatch_location(
location,
library_ctx.id,
local_path_str,
// Watcher management messages
Some(WatcherManagementMessage{
location_id,
library_ctx,
action,
response_tx,
}) = watcher_management_rx.recv() => {
match action {
// To stop a watcher
WatcherManagementMessageAction::Stop => {
handle_stop_watcher_request(
location_id,
library_ctx,
response_tx,
&mut forced_unwatch,
&mut locations_watched,
&mut locations_unwatched,
);
locations_unwatched.remove(&(location_id, library_ctx.id));
} else {
drop_location(
).await;
},
// To reinit a stopped watcher
WatcherManagementMessageAction::Reinit => {
handle_reinit_watcher_request(
location_id,
library_ctx.id,
"Dropping location from location manager, because we don't have a `local_path` anymore",
library_ctx,
response_tx,
&mut forced_unwatch,
&mut locations_watched,
&mut locations_unwatched
&mut locations_unwatched,
).await;
},
// To ignore or not events for a path
WatcherManagementMessageAction::IgnoreEventsForPath { path, ignore } => {
handle_ignore_path_request(
location_id,
library_ctx,
path,
ignore,
response_tx,
&locations_watched,
);
}
} else {
drop_location(
location_id,
library_ctx.id,
"Removing location from manager, as we failed to fetch from db",
&mut locations_watched,
&mut locations_unwatched
);
},
}
// Marking location as removed, so we don't try to check it when the time comes
to_remove.insert((location_id, library_ctx.id));
let _ = response_tx.send(Ok(())); // ignore errors, we handle errors on receiver
}
// Periodically checking locations
Some((location_id, library_ctx)) = to_check_futures.next() => {
if to_remove.contains(&(location_id, library_ctx.id)) {
let key = (location_id, library_ctx.id);
if to_remove.contains(&key) {
// The time to check came for an already removed library, so we just ignore it
to_remove.remove(&(location_id, library_ctx.id));
to_remove.remove(&key);
} else if let Some(location) = get_location(location_id, &library_ctx).await {
if let Some(ref local_path_str) = location.local_path.clone() {
if check_online(&location, &library_ctx).await {
if check_online(&location, &library_ctx).await
&& !forced_unwatch.contains(&key)
{
watch_location(
location,
library_ctx.id,
local_path_str,
&mut locations_watched,
&mut locations_unwatched
&mut locations_unwatched,
);
} else {
unwatch_location(
@ -233,7 +430,7 @@ impl LocationManager {
library_ctx.id,
local_path_str,
&mut locations_watched,
&mut locations_unwatched
&mut locations_unwatched,
);
}
to_check_futures.push(location_check_sleep(location_id, library_ctx));
@ -241,10 +438,12 @@ impl LocationManager {
drop_location(
location_id,
library_ctx.id,
"Dropping location from location manager, because we don't have a `local_path` anymore",
"Dropping location from location manager, because \
we don't have a `local_path` anymore",
&mut locations_watched,
&mut locations_unwatched
);
forced_unwatch.remove(&key);
}
} else {
drop_location(
@ -252,8 +451,9 @@ impl LocationManager {
library_ctx.id,
"Removing location from manager, as we failed to fetch from db",
&mut locations_watched,
&mut locations_unwatched
&mut locations_unwatched,
);
forced_unwatch.remove(&key);
}
}
@ -277,3 +477,50 @@ impl Drop for LocationManager {
}
}
}
#[must_use = "this `StopWatcherGuard` must be held for some time, so the watcher is stopped"]
pub struct StopWatcherGuard<'m> {
manager: &'m LocationManager,
location_id: LocationId,
library_ctx: Option<LibraryContext>,
}
impl Drop for StopWatcherGuard<'_> {
fn drop(&mut self) {
if cfg!(feature = "location-watcher") {
// FIXME: change this Drop to async drop in the future
if let Err(e) = block_on(
self.manager
.reinit_watcher(self.location_id, self.library_ctx.take().unwrap()),
) {
error!("Failed to reinit watcher on stop watcher guard drop: {e}");
}
}
}
}
#[must_use = "this `IgnoreEventsForPathGuard` must be held for some time, so the watcher can ignore events for the desired path"]
pub struct IgnoreEventsForPathGuard<'m> {
manager: &'m LocationManager,
path: Option<PathBuf>,
location_id: LocationId,
library_ctx: Option<LibraryContext>,
}
impl Drop for IgnoreEventsForPathGuard<'_> {
fn drop(&mut self) {
if cfg!(feature = "location-watcher") {
// FIXME: change this Drop to async drop in the future
if let Err(e) = block_on(self.manager.watcher_management_message(
self.location_id,
self.library_ctx.take().unwrap(),
WatcherManagementMessageAction::IgnoreEventsForPath {
path: self.path.take().unwrap(),
ignore: false,
},
)) {
error!("Failed to un-ignore path on watcher guard drop: {e}");
}
}
}
}

View file

@ -3,7 +3,10 @@ use crate::{
prisma::{file_path, location},
};
use std::path::{Path, PathBuf};
use std::{
collections::HashSet,
path::{Path, PathBuf},
};
use async_trait::async_trait;
use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
@ -39,6 +42,8 @@ type Handler = windows::WindowsEventHandler;
file_path::include!(file_path_with_object { object });
pub(super) type IgnorePath = (PathBuf, bool);
#[async_trait]
trait EventHandler {
fn new() -> Self
@ -58,6 +63,7 @@ pub(super) struct LocationWatcher {
location: location::Data,
path: PathBuf,
watcher: RecommendedWatcher,
ignore_path_tx: mpsc::UnboundedSender<IgnorePath>,
handle: Option<JoinHandle<()>>,
stop_tx: Option<oneshot::Sender<()>>,
}
@ -68,6 +74,7 @@ impl LocationWatcher {
library_ctx: LibraryContext,
) -> Result<Self, LocationManagerError> {
let (events_tx, events_rx) = mpsc::unbounded_channel();
let (ignore_path_tx, ignore_path_rx) = mpsc::unbounded_channel();
let (stop_tx, stop_rx) = oneshot::channel();
let watcher = RecommendedWatcher::new(
@ -89,12 +96,6 @@ impl LocationWatcher {
Config::default(),
)?;
let handle = tokio::spawn(Self::handle_watch_events(
location.id,
library_ctx,
events_rx,
stop_rx,
));
let path = PathBuf::from(
location
.local_path
@ -102,10 +103,19 @@ impl LocationWatcher {
.ok_or(LocationManagerError::LocationMissingLocalPath(location.id))?,
);
let handle = tokio::spawn(Self::handle_watch_events(
location.id,
library_ctx,
events_rx,
ignore_path_rx,
stop_rx,
));
Ok(Self {
location,
path,
watcher,
ignore_path_tx,
handle: Some(handle),
stop_tx: Some(stop_tx),
})
@ -115,10 +125,13 @@ impl LocationWatcher {
location_id: LocationId,
library_ctx: LibraryContext,
mut events_rx: mpsc::UnboundedReceiver<notify::Result<Event>>,
mut ignore_path_rx: mpsc::UnboundedReceiver<IgnorePath>,
mut stop_rx: oneshot::Receiver<()>,
) {
let mut event_handler = Handler::new();
let mut paths_to_ignore = HashSet::new();
loop {
select! {
Some(event) = events_rx.recv() => {
@ -128,7 +141,8 @@ impl LocationWatcher {
location_id,
event,
&mut event_handler,
&library_ctx
&library_ctx,
&paths_to_ignore,
).await {
error!("Failed to handle location file system event: \
<id='{location_id}', error='{e:#?}'>",
@ -140,6 +154,15 @@ impl LocationWatcher {
}
}
}
Some((path, ignore)) = ignore_path_rx.recv() => {
if ignore {
paths_to_ignore.insert(path);
} else {
paths_to_ignore.remove(&path);
}
}
_ = &mut stop_rx => {
debug!("Stop Location Manager event handler for location: <id='{}'>", location_id);
break
@ -153,8 +176,9 @@ impl LocationWatcher {
event: Event,
event_handler: &mut impl EventHandler,
library_ctx: &LibraryContext,
ignore_paths: &HashSet<PathBuf>,
) -> Result<(), LocationManagerError> {
if check_event(&event) {
if check_event(&event, ignore_paths) {
if let Some(location) = fetch_location(library_ctx, location_id)
.include(indexer_job_location::include())
.exec()
@ -175,6 +199,14 @@ impl LocationWatcher {
Ok(())
}
pub(super) fn ignore_path(
&self,
path: PathBuf,
ignore: bool,
) -> Result<(), LocationManagerError> {
self.ignore_path_tx.send((path, ignore)).map_err(Into::into)
}
pub(super) fn check_path(&self, path: impl AsRef<Path>) -> bool {
self.path == path.as_ref()
}

View file

@ -18,6 +18,7 @@ use crate::{
};
use std::{
collections::HashSet,
path::{Path, PathBuf},
str::FromStr,
};
@ -46,12 +47,13 @@ pub(super) fn check_location_online(location: &indexer_job_location::Data) -> bo
}
}
pub(super) fn check_event(event: &Event) -> bool {
pub(super) fn check_event(event: &Event, ignore_paths: &HashSet<PathBuf>) -> bool {
// if first path includes .DS_Store, ignore
if event.paths.iter().any(|p| {
p.to_str()
.expect("Found non-UTF-8 path")
.contains(".DS_Store")
|| ignore_paths.contains(p)
}) {
return false;
}

View file

@ -1,9 +1,4 @@
use std::{
collections::VecDeque,
fs::{self, File},
io::Read,
path::PathBuf,
};
use std::{collections::VecDeque, fs::File, io::Read, path::PathBuf};
use tokio::task;
@ -64,8 +59,8 @@ const JOB_NAME: &str = "file_encryptor";
#[async_trait::async_trait]
impl StatefulJob for FileEncryptorJob {
type Data = FileEncryptorJobState;
type Init = FileEncryptorJobInit;
type Data = FileEncryptorJobState;
type Step = FileEncryptorJobStep;
fn name(&self) -> &'static str {
@ -136,8 +131,18 @@ impl StatefulJob for FileEncryptorJob {
Ok,
)?;
let mut reader = File::open(info.obj_path.clone())?;
let mut writer = File::create(output_path)?;
let _guard = ctx
.library_ctx
.location_manager()
.temporary_ignore_events_for_path(
state.init.location_id,
ctx.library_ctx.clone(),
&output_path,
)
.await?;
let mut reader = task::block_in_place(|| File::open(&info.obj_path))?;
let mut writer = task::block_in_place(|| File::create(output_path))?;
let master_key = generate_master_key();
@ -202,7 +207,7 @@ impl StatefulJob for FileEncryptorJob {
.join("thumbnails")
.join(object.cas_id + ".webp");
if fs::metadata(pvm_path.clone()).is_ok() {
if tokio::fs::metadata(&pvm_path).await.is_ok() {
let mut pvm_bytes = Vec::new();
task::block_in_place(|| {
let mut pvm_file = File::open(pvm_path)?;