mirror of
https://github.com/spacedriveapp/spacedrive
synced 2024-07-08 07:12:49 +00:00
Merge branch 'main' of github.com:spacedriveapp/spacedrive into consistent-formatting-please
This commit is contained in:
commit
49218cb1bb
|
@ -3,8 +3,8 @@ import React, { Suspense } from 'react';
|
|||
import ReactDOM from 'react-dom/client';
|
||||
import '@sd/ui/style';
|
||||
// THIS MUST GO BEFORE importing the App
|
||||
import '~/patches';
|
||||
import App from './App';
|
||||
import './patches';
|
||||
|
||||
const root = ReactDOM.createRoot(document.getElementById('root') as HTMLElement);
|
||||
root.render(
|
||||
|
|
|
@ -2,7 +2,10 @@
|
|||
"extends": "../../packages/config/base.tsconfig.json",
|
||||
"compilerOptions": {
|
||||
"rootDir": "src",
|
||||
"declarationDir": "dist"
|
||||
"declarationDir": "dist",
|
||||
"paths": {
|
||||
"~/*": ["./src/*"]
|
||||
}
|
||||
},
|
||||
"include": ["src"],
|
||||
"references": [
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use crate::{
|
||||
location::{indexer::IndexerError, LocationError},
|
||||
location::{indexer::IndexerError, LocationError, LocationManagerError},
|
||||
object::{identifier_job::IdentifierJobError, preview::ThumbnailError},
|
||||
};
|
||||
|
||||
|
@ -45,6 +45,8 @@ pub enum JobError {
|
|||
MissingJobDataState(Uuid, String),
|
||||
#[error("missing some job data: '{value}'")]
|
||||
MissingData { value: String },
|
||||
#[error("Location manager error: {0}")]
|
||||
LocationManager(#[from] LocationManagerError),
|
||||
|
||||
// Specific job errors
|
||||
#[error("Indexer error: {0}")]
|
||||
|
|
|
@ -77,6 +77,11 @@ impl Node {
|
|||
.parse()
|
||||
.expect("Error invalid tracing directive!"),
|
||||
)
|
||||
.add_directive(
|
||||
"sd_core::location::manager=info"
|
||||
.parse()
|
||||
.expect("Error invalid tracing directive!"),
|
||||
)
|
||||
.add_directive(
|
||||
"sd_core_mobile=debug"
|
||||
.parse()
|
||||
|
|
|
@ -1,16 +1,16 @@
|
|||
use crate::{library::LibraryContext, prisma::location};
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
collections::{HashMap, HashSet},
|
||||
path::{Path, PathBuf},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use tokio::{fs, io::ErrorKind, time::sleep};
|
||||
use tokio::{fs, io::ErrorKind, sync::oneshot, time::sleep};
|
||||
use tracing::{error, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{watcher::LocationWatcher, LocationId};
|
||||
use super::{watcher::LocationWatcher, LocationId, LocationManagerError};
|
||||
|
||||
type LibraryId = Uuid;
|
||||
type LocationAndLibraryKey = (LocationId, LibraryId);
|
||||
|
@ -163,3 +163,176 @@ pub(super) fn subtract_location_path(
|
|||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn handle_remove_location_request(
|
||||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
to_remove: &mut HashSet<LocationAndLibraryKey>,
|
||||
) {
|
||||
let key = (location_id, library_ctx.id);
|
||||
if let Some(location) = get_location(location_id, &library_ctx).await {
|
||||
if let Some(ref local_path_str) = location.local_path.clone() {
|
||||
unwatch_location(
|
||||
location,
|
||||
library_ctx.id,
|
||||
local_path_str,
|
||||
locations_watched,
|
||||
locations_unwatched,
|
||||
);
|
||||
locations_unwatched.remove(&key);
|
||||
forced_unwatch.remove(&key);
|
||||
} else {
|
||||
drop_location(
|
||||
location_id,
|
||||
library_ctx.id,
|
||||
"Dropping location from location manager, because we don't have a `local_path` anymore",
|
||||
locations_watched,
|
||||
locations_unwatched
|
||||
);
|
||||
}
|
||||
} else {
|
||||
drop_location(
|
||||
location_id,
|
||||
library_ctx.id,
|
||||
"Removing location from manager, as we failed to fetch from db",
|
||||
locations_watched,
|
||||
locations_unwatched,
|
||||
);
|
||||
}
|
||||
|
||||
// Marking location as removed, so we don't try to check it when the time comes
|
||||
to_remove.insert(key);
|
||||
|
||||
let _ = response_tx.send(Ok(())); // ignore errors, we handle errors on receiver
|
||||
}
|
||||
|
||||
pub(super) async fn handle_stop_watcher_request(
|
||||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) {
|
||||
async fn inner(
|
||||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
let key = (location_id, library_ctx.id);
|
||||
if !forced_unwatch.contains(&key) && locations_watched.contains_key(&key) {
|
||||
get_location(location_id, &library_ctx)
|
||||
.await
|
||||
.ok_or_else(|| LocationManagerError::FailedToStopOrReinitWatcher {
|
||||
reason: String::from("failed to fetch location from db"),
|
||||
})
|
||||
.map(|location| {
|
||||
location
|
||||
.local_path
|
||||
.clone()
|
||||
.ok_or(LocationManagerError::LocationMissingLocalPath(location_id))
|
||||
.map(|local_path_str| {
|
||||
unwatch_location(
|
||||
location,
|
||||
library_ctx.id,
|
||||
local_path_str,
|
||||
locations_watched,
|
||||
locations_unwatched,
|
||||
);
|
||||
forced_unwatch.insert(key);
|
||||
})
|
||||
})?
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let _ = response_tx.send(
|
||||
inner(
|
||||
location_id,
|
||||
library_ctx,
|
||||
forced_unwatch,
|
||||
locations_watched,
|
||||
locations_unwatched,
|
||||
)
|
||||
.await,
|
||||
); // ignore errors, we handle errors on receiver
|
||||
}
|
||||
|
||||
pub(super) async fn handle_reinit_watcher_request(
|
||||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) {
|
||||
async fn inner(
|
||||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
let key = (location_id, library_ctx.id);
|
||||
if forced_unwatch.contains(&key) && locations_unwatched.contains_key(&key) {
|
||||
get_location(location_id, &library_ctx)
|
||||
.await
|
||||
.ok_or_else(|| LocationManagerError::FailedToStopOrReinitWatcher {
|
||||
reason: String::from("failed to fetch location from db"),
|
||||
})
|
||||
.map(|location| {
|
||||
location
|
||||
.local_path
|
||||
.clone()
|
||||
.ok_or(LocationManagerError::LocationMissingLocalPath(location_id))
|
||||
.map(|local_path_str| {
|
||||
watch_location(
|
||||
location,
|
||||
library_ctx.id,
|
||||
local_path_str,
|
||||
locations_watched,
|
||||
locations_unwatched,
|
||||
);
|
||||
forced_unwatch.remove(&key);
|
||||
})
|
||||
})?
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let _ = response_tx.send(
|
||||
inner(
|
||||
location_id,
|
||||
library_ctx,
|
||||
forced_unwatch,
|
||||
locations_watched,
|
||||
locations_unwatched,
|
||||
)
|
||||
.await,
|
||||
); // ignore errors, we handle errors on receiver
|
||||
}
|
||||
|
||||
pub(super) fn handle_ignore_path_request(
|
||||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
path: PathBuf,
|
||||
ignore: bool,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
locations_watched: &HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) {
|
||||
let _ = response_tx.send(
|
||||
if let Some(watcher) = locations_watched.get(&(location_id, library_ctx.id)) {
|
||||
watcher.ignore_path(path, ignore)
|
||||
} else {
|
||||
Ok(())
|
||||
},
|
||||
); // ignore errors, we handle errors on receiver
|
||||
}
|
||||
|
|
|
@ -1,13 +1,20 @@
|
|||
use crate::library::LibraryContext;
|
||||
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
|
||||
use thiserror::Error;
|
||||
use tokio::{
|
||||
io,
|
||||
sync::{mpsc, oneshot},
|
||||
use std::{
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
use tracing::{debug, error};
|
||||
|
||||
use futures::executor::block_on;
|
||||
use thiserror::Error;
|
||||
use tokio::{io, sync::oneshot};
|
||||
use tracing::error;
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
use tracing::debug;
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
mod watcher;
|
||||
|
@ -17,16 +24,53 @@ mod helpers;
|
|||
|
||||
pub type LocationId = i32;
|
||||
|
||||
type ManagerMessage = (
|
||||
LocationId,
|
||||
LibraryContext,
|
||||
oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
);
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
#[allow(dead_code)]
|
||||
enum ManagementMessageAction {
|
||||
Add,
|
||||
Remove,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub struct LocationManagementMessage {
|
||||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
action: ManagementMessageAction,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
enum WatcherManagementMessageAction {
|
||||
Stop,
|
||||
Reinit,
|
||||
IgnoreEventsForPath { path: PathBuf, ignore: bool },
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub struct WatcherManagementMessage {
|
||||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
action: WatcherManagementMessageAction,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum LocationManagerError {
|
||||
#[error("Unable to send location id to be checked by actor: (error: {0})")]
|
||||
ActorSendLocationError(#[from] mpsc::error::SendError<ManagerMessage>),
|
||||
#[cfg(feature = "location-watcher")]
|
||||
#[error("Unable to send location management message to location manager actor: (error: {0})")]
|
||||
ActorSendLocationError(#[from] mpsc::error::SendError<LocationManagementMessage>),
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
#[error("Unable to send path to be ignored by watcher actor: (error: {0})")]
|
||||
ActorIgnorePathError(#[from] mpsc::error::SendError<watcher::IgnorePath>),
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
#[error("Unable to watcher management message to watcher manager actor: (error: {0})")]
|
||||
ActorIgnorePathMessageError(#[from] mpsc::error::SendError<WatcherManagementMessage>),
|
||||
|
||||
#[error("Unable to receive actor response: (error: {0})")]
|
||||
ActorResponseError(#[from] oneshot::error::RecvError),
|
||||
|
||||
|
@ -34,6 +78,9 @@ pub enum LocationManagerError {
|
|||
#[error("Watcher error: (error: {0})")]
|
||||
WatcherError(#[from] notify::Error),
|
||||
|
||||
#[error("Failed to stop or reinit a watcher: {reason}")]
|
||||
FailedToStopOrReinitWatcher { reason: String },
|
||||
|
||||
#[error("Location missing local path: <id='{0}'>")]
|
||||
LocationMissingLocalPath(LocationId),
|
||||
#[error("Tried to update a non-existing file: <path='{0}'>")]
|
||||
|
@ -48,35 +95,98 @@ pub enum LocationManagerError {
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct LocationManager {
|
||||
add_locations_tx: mpsc::Sender<ManagerMessage>,
|
||||
remove_locations_tx: mpsc::Sender<ManagerMessage>,
|
||||
#[cfg(feature = "location-watcher")]
|
||||
location_management_tx: mpsc::Sender<LocationManagementMessage>,
|
||||
#[cfg(feature = "location-watcher")]
|
||||
watcher_management_tx: mpsc::Sender<WatcherManagementMessage>,
|
||||
stop_tx: Option<oneshot::Sender<()>>,
|
||||
}
|
||||
|
||||
impl LocationManager {
|
||||
#[allow(unused)]
|
||||
pub fn new() -> Arc<Self> {
|
||||
let (add_locations_tx, add_locations_rx) = mpsc::channel(128);
|
||||
let (remove_locations_tx, remove_locations_rx) = mpsc::channel(128);
|
||||
let (stop_tx, stop_rx) = oneshot::channel();
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
tokio::spawn(Self::run_locations_checker(
|
||||
add_locations_rx,
|
||||
remove_locations_rx,
|
||||
stop_rx,
|
||||
));
|
||||
{
|
||||
let (location_management_tx, location_management_rx) = mpsc::channel(128);
|
||||
let (watcher_management_tx, watcher_management_rx) = mpsc::channel(128);
|
||||
let (stop_tx, stop_rx) = oneshot::channel();
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
tokio::spawn(Self::run_locations_checker(
|
||||
location_management_rx,
|
||||
watcher_management_rx,
|
||||
stop_rx,
|
||||
));
|
||||
|
||||
debug!("Location manager initialized");
|
||||
|
||||
Arc::new(Self {
|
||||
location_management_tx,
|
||||
watcher_management_tx,
|
||||
stop_tx: Some(stop_tx),
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "location-watcher"))]
|
||||
tracing::warn!("Location watcher is disabled, locations will not be checked");
|
||||
{
|
||||
tracing::warn!("Location watcher is disabled, locations will not be checked");
|
||||
Arc::new(Self { stop_tx: None })
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Location manager initialized");
|
||||
#[inline]
|
||||
#[allow(unused_variables)]
|
||||
async fn location_management_message(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
action: ManagementMessageAction,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
#[cfg(feature = "location-watcher")]
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
Arc::new(Self {
|
||||
add_locations_tx,
|
||||
remove_locations_tx,
|
||||
stop_tx: Some(stop_tx),
|
||||
})
|
||||
self.location_management_tx
|
||||
.send(LocationManagementMessage {
|
||||
location_id,
|
||||
library_ctx,
|
||||
action,
|
||||
response_tx: tx,
|
||||
})
|
||||
.await?;
|
||||
|
||||
rx.await?
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "location-watcher"))]
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[allow(unused_variables)]
|
||||
async fn watcher_management_message(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
action: WatcherManagementMessageAction,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
#[cfg(feature = "location-watcher")]
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
self.watcher_management_tx
|
||||
.send(WatcherManagementMessage {
|
||||
location_id,
|
||||
library_ctx,
|
||||
action,
|
||||
response_tx: tx,
|
||||
})
|
||||
.await?;
|
||||
|
||||
rx.await?
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "location-watcher"))]
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn add(
|
||||
|
@ -84,17 +194,8 @@ impl LocationManager {
|
|||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
if cfg!(feature = "location-watcher") {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
self.add_locations_tx
|
||||
.send((location_id, library_ctx, tx))
|
||||
.await?;
|
||||
|
||||
rx.await?
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
self.location_management_message(location_id, library_ctx, ManagementMessageAction::Add)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn remove(
|
||||
|
@ -102,23 +203,80 @@ impl LocationManager {
|
|||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
if cfg!(feature = "location-watcher") {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.location_management_message(location_id, library_ctx, ManagementMessageAction::Remove)
|
||||
.await
|
||||
}
|
||||
|
||||
self.remove_locations_tx
|
||||
.send((location_id, library_ctx, tx))
|
||||
.await?;
|
||||
pub async fn stop_watcher(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
self.watcher_management_message(
|
||||
location_id,
|
||||
library_ctx,
|
||||
WatcherManagementMessageAction::Stop,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
rx.await?
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
pub async fn reinit_watcher(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
self.watcher_management_message(
|
||||
location_id,
|
||||
library_ctx,
|
||||
WatcherManagementMessageAction::Reinit,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn temporary_stop(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
) -> Result<StopWatcherGuard, LocationManagerError> {
|
||||
self.stop_watcher(location_id, library_ctx.clone()).await?;
|
||||
|
||||
Ok(StopWatcherGuard {
|
||||
location_id,
|
||||
library_ctx: Some(library_ctx),
|
||||
manager: self,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn temporary_ignore_events_for_path(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
path: impl AsRef<Path>,
|
||||
) -> Result<IgnoreEventsForPathGuard, LocationManagerError> {
|
||||
let path = path.as_ref().to_path_buf();
|
||||
|
||||
self.watcher_management_message(
|
||||
location_id,
|
||||
library_ctx.clone(),
|
||||
WatcherManagementMessageAction::IgnoreEventsForPath {
|
||||
path: path.clone(),
|
||||
ignore: true,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(IgnoreEventsForPathGuard {
|
||||
location_id,
|
||||
library_ctx: Some(library_ctx),
|
||||
manager: self,
|
||||
path: Some(path),
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
async fn run_locations_checker(
|
||||
mut add_locations_rx: mpsc::Receiver<ManagerMessage>,
|
||||
mut remove_locations_rx: mpsc::Receiver<ManagerMessage>,
|
||||
mut location_management_rx: mpsc::Receiver<LocationManagementMessage>,
|
||||
mut watcher_management_rx: mpsc::Receiver<WatcherManagementMessage>,
|
||||
mut stop_rx: oneshot::Receiver<()>,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
@ -128,8 +286,9 @@ impl LocationManager {
|
|||
use tracing::{info, warn};
|
||||
|
||||
use helpers::{
|
||||
check_online, drop_location, get_location, location_check_sleep, unwatch_location,
|
||||
watch_location,
|
||||
check_online, drop_location, get_location, handle_ignore_path_request,
|
||||
handle_reinit_watcher_request, handle_remove_location_request,
|
||||
handle_stop_watcher_request, location_check_sleep, unwatch_location, watch_location,
|
||||
};
|
||||
use watcher::LocationWatcher;
|
||||
|
||||
|
@ -137,95 +296,133 @@ impl LocationManager {
|
|||
let mut to_remove = HashSet::new();
|
||||
let mut locations_watched = HashMap::new();
|
||||
let mut locations_unwatched = HashMap::new();
|
||||
let mut forced_unwatch = HashSet::new();
|
||||
|
||||
loop {
|
||||
select! {
|
||||
// To add a new location
|
||||
Some((location_id, library_ctx, response_tx)) = add_locations_rx.recv() => {
|
||||
if let Some(location) = get_location(location_id, &library_ctx).await {
|
||||
let is_online = check_online(&location, &library_ctx).await;
|
||||
let _ = response_tx.send(
|
||||
LocationWatcher::new(location, library_ctx.clone())
|
||||
.await
|
||||
.map(|mut watcher| {
|
||||
if is_online {
|
||||
watcher.watch();
|
||||
locations_watched.insert(
|
||||
(location_id, library_ctx.id),
|
||||
watcher
|
||||
);
|
||||
} else {
|
||||
locations_unwatched.insert(
|
||||
(location_id, library_ctx.id),
|
||||
watcher
|
||||
);
|
||||
}
|
||||
// Location management messages
|
||||
Some(LocationManagementMessage{
|
||||
location_id,
|
||||
library_ctx,
|
||||
action,
|
||||
response_tx
|
||||
}) = location_management_rx.recv() => {
|
||||
match action {
|
||||
|
||||
to_check_futures.push(
|
||||
location_check_sleep(location_id, library_ctx)
|
||||
);
|
||||
}
|
||||
)
|
||||
); // ignore errors, we handle errors on receiver
|
||||
} else {
|
||||
warn!(
|
||||
"Location not found in database to be watched: {}",
|
||||
location_id
|
||||
);
|
||||
// To add a new location
|
||||
ManagementMessageAction::Add => {
|
||||
if let Some(location) = get_location(location_id, &library_ctx).await {
|
||||
let is_online = check_online(&location, &library_ctx).await;
|
||||
let _ = response_tx.send(
|
||||
LocationWatcher::new(location, library_ctx.clone())
|
||||
.await
|
||||
.map(|mut watcher| {
|
||||
if is_online {
|
||||
watcher.watch();
|
||||
locations_watched.insert(
|
||||
(location_id, library_ctx.id),
|
||||
watcher
|
||||
);
|
||||
} else {
|
||||
locations_unwatched.insert(
|
||||
(location_id, library_ctx.id),
|
||||
watcher
|
||||
);
|
||||
}
|
||||
|
||||
to_check_futures.push(
|
||||
location_check_sleep(location_id, library_ctx)
|
||||
);
|
||||
}
|
||||
)
|
||||
); // ignore errors, we handle errors on receiver
|
||||
} else {
|
||||
warn!(
|
||||
"Location not found in database to be watched: {}",
|
||||
location_id
|
||||
);
|
||||
}
|
||||
},
|
||||
|
||||
// To remove an location
|
||||
ManagementMessageAction::Remove => {
|
||||
handle_remove_location_request(
|
||||
location_id,
|
||||
library_ctx,
|
||||
response_tx,
|
||||
&mut forced_unwatch,
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched,
|
||||
&mut to_remove,
|
||||
).await;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// To remove an location
|
||||
Some((location_id, library_ctx, response_tx)) = remove_locations_rx.recv() => {
|
||||
if let Some(location) = get_location(location_id, &library_ctx).await {
|
||||
if let Some(ref local_path_str) = location.local_path.clone() {
|
||||
unwatch_location(
|
||||
location,
|
||||
library_ctx.id,
|
||||
local_path_str,
|
||||
// Watcher management messages
|
||||
Some(WatcherManagementMessage{
|
||||
location_id,
|
||||
library_ctx,
|
||||
action,
|
||||
response_tx,
|
||||
}) = watcher_management_rx.recv() => {
|
||||
match action {
|
||||
// To stop a watcher
|
||||
WatcherManagementMessageAction::Stop => {
|
||||
handle_stop_watcher_request(
|
||||
location_id,
|
||||
library_ctx,
|
||||
response_tx,
|
||||
&mut forced_unwatch,
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched,
|
||||
);
|
||||
locations_unwatched.remove(&(location_id, library_ctx.id));
|
||||
} else {
|
||||
drop_location(
|
||||
).await;
|
||||
},
|
||||
|
||||
// To reinit a stopped watcher
|
||||
WatcherManagementMessageAction::Reinit => {
|
||||
handle_reinit_watcher_request(
|
||||
location_id,
|
||||
library_ctx.id,
|
||||
"Dropping location from location manager, because we don't have a `local_path` anymore",
|
||||
library_ctx,
|
||||
response_tx,
|
||||
&mut forced_unwatch,
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched
|
||||
&mut locations_unwatched,
|
||||
).await;
|
||||
},
|
||||
|
||||
// To ignore or not events for a path
|
||||
WatcherManagementMessageAction::IgnoreEventsForPath { path, ignore } => {
|
||||
handle_ignore_path_request(
|
||||
location_id,
|
||||
library_ctx,
|
||||
path,
|
||||
ignore,
|
||||
response_tx,
|
||||
&locations_watched,
|
||||
);
|
||||
}
|
||||
} else {
|
||||
drop_location(
|
||||
location_id,
|
||||
library_ctx.id,
|
||||
"Removing location from manager, as we failed to fetch from db",
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched
|
||||
);
|
||||
},
|
||||
}
|
||||
|
||||
// Marking location as removed, so we don't try to check it when the time comes
|
||||
to_remove.insert((location_id, library_ctx.id));
|
||||
|
||||
let _ = response_tx.send(Ok(())); // ignore errors, we handle errors on receiver
|
||||
}
|
||||
|
||||
// Periodically checking locations
|
||||
Some((location_id, library_ctx)) = to_check_futures.next() => {
|
||||
if to_remove.contains(&(location_id, library_ctx.id)) {
|
||||
let key = (location_id, library_ctx.id);
|
||||
|
||||
if to_remove.contains(&key) {
|
||||
// The time to check came for an already removed library, so we just ignore it
|
||||
to_remove.remove(&(location_id, library_ctx.id));
|
||||
to_remove.remove(&key);
|
||||
} else if let Some(location) = get_location(location_id, &library_ctx).await {
|
||||
if let Some(ref local_path_str) = location.local_path.clone() {
|
||||
if check_online(&location, &library_ctx).await {
|
||||
if check_online(&location, &library_ctx).await
|
||||
&& !forced_unwatch.contains(&key)
|
||||
{
|
||||
watch_location(
|
||||
location,
|
||||
library_ctx.id,
|
||||
local_path_str,
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched
|
||||
&mut locations_unwatched,
|
||||
);
|
||||
} else {
|
||||
unwatch_location(
|
||||
|
@ -233,7 +430,7 @@ impl LocationManager {
|
|||
library_ctx.id,
|
||||
local_path_str,
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched
|
||||
&mut locations_unwatched,
|
||||
);
|
||||
}
|
||||
to_check_futures.push(location_check_sleep(location_id, library_ctx));
|
||||
|
@ -241,10 +438,12 @@ impl LocationManager {
|
|||
drop_location(
|
||||
location_id,
|
||||
library_ctx.id,
|
||||
"Dropping location from location manager, because we don't have a `local_path` anymore",
|
||||
"Dropping location from location manager, because \
|
||||
we don't have a `local_path` anymore",
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched
|
||||
);
|
||||
forced_unwatch.remove(&key);
|
||||
}
|
||||
} else {
|
||||
drop_location(
|
||||
|
@ -252,8 +451,9 @@ impl LocationManager {
|
|||
library_ctx.id,
|
||||
"Removing location from manager, as we failed to fetch from db",
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched
|
||||
&mut locations_unwatched,
|
||||
);
|
||||
forced_unwatch.remove(&key);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -277,3 +477,50 @@ impl Drop for LocationManager {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use = "this `StopWatcherGuard` must be held for some time, so the watcher is stopped"]
|
||||
pub struct StopWatcherGuard<'m> {
|
||||
manager: &'m LocationManager,
|
||||
location_id: LocationId,
|
||||
library_ctx: Option<LibraryContext>,
|
||||
}
|
||||
|
||||
impl Drop for StopWatcherGuard<'_> {
|
||||
fn drop(&mut self) {
|
||||
if cfg!(feature = "location-watcher") {
|
||||
// FIXME: change this Drop to async drop in the future
|
||||
if let Err(e) = block_on(
|
||||
self.manager
|
||||
.reinit_watcher(self.location_id, self.library_ctx.take().unwrap()),
|
||||
) {
|
||||
error!("Failed to reinit watcher on stop watcher guard drop: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use = "this `IgnoreEventsForPathGuard` must be held for some time, so the watcher can ignore events for the desired path"]
|
||||
pub struct IgnoreEventsForPathGuard<'m> {
|
||||
manager: &'m LocationManager,
|
||||
path: Option<PathBuf>,
|
||||
location_id: LocationId,
|
||||
library_ctx: Option<LibraryContext>,
|
||||
}
|
||||
|
||||
impl Drop for IgnoreEventsForPathGuard<'_> {
|
||||
fn drop(&mut self) {
|
||||
if cfg!(feature = "location-watcher") {
|
||||
// FIXME: change this Drop to async drop in the future
|
||||
if let Err(e) = block_on(self.manager.watcher_management_message(
|
||||
self.location_id,
|
||||
self.library_ctx.take().unwrap(),
|
||||
WatcherManagementMessageAction::IgnoreEventsForPath {
|
||||
path: self.path.take().unwrap(),
|
||||
ignore: false,
|
||||
},
|
||||
)) {
|
||||
error!("Failed to un-ignore path on watcher guard drop: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,10 @@ use crate::{
|
|||
prisma::{file_path, location},
|
||||
};
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
|
||||
|
@ -39,6 +42,8 @@ type Handler = windows::WindowsEventHandler;
|
|||
|
||||
file_path::include!(file_path_with_object { object });
|
||||
|
||||
pub(super) type IgnorePath = (PathBuf, bool);
|
||||
|
||||
#[async_trait]
|
||||
trait EventHandler {
|
||||
fn new() -> Self
|
||||
|
@ -58,6 +63,7 @@ pub(super) struct LocationWatcher {
|
|||
location: location::Data,
|
||||
path: PathBuf,
|
||||
watcher: RecommendedWatcher,
|
||||
ignore_path_tx: mpsc::UnboundedSender<IgnorePath>,
|
||||
handle: Option<JoinHandle<()>>,
|
||||
stop_tx: Option<oneshot::Sender<()>>,
|
||||
}
|
||||
|
@ -68,6 +74,7 @@ impl LocationWatcher {
|
|||
library_ctx: LibraryContext,
|
||||
) -> Result<Self, LocationManagerError> {
|
||||
let (events_tx, events_rx) = mpsc::unbounded_channel();
|
||||
let (ignore_path_tx, ignore_path_rx) = mpsc::unbounded_channel();
|
||||
let (stop_tx, stop_rx) = oneshot::channel();
|
||||
|
||||
let watcher = RecommendedWatcher::new(
|
||||
|
@ -89,12 +96,6 @@ impl LocationWatcher {
|
|||
Config::default(),
|
||||
)?;
|
||||
|
||||
let handle = tokio::spawn(Self::handle_watch_events(
|
||||
location.id,
|
||||
library_ctx,
|
||||
events_rx,
|
||||
stop_rx,
|
||||
));
|
||||
let path = PathBuf::from(
|
||||
location
|
||||
.local_path
|
||||
|
@ -102,10 +103,19 @@ impl LocationWatcher {
|
|||
.ok_or(LocationManagerError::LocationMissingLocalPath(location.id))?,
|
||||
);
|
||||
|
||||
let handle = tokio::spawn(Self::handle_watch_events(
|
||||
location.id,
|
||||
library_ctx,
|
||||
events_rx,
|
||||
ignore_path_rx,
|
||||
stop_rx,
|
||||
));
|
||||
|
||||
Ok(Self {
|
||||
location,
|
||||
path,
|
||||
watcher,
|
||||
ignore_path_tx,
|
||||
handle: Some(handle),
|
||||
stop_tx: Some(stop_tx),
|
||||
})
|
||||
|
@ -115,10 +125,13 @@ impl LocationWatcher {
|
|||
location_id: LocationId,
|
||||
library_ctx: LibraryContext,
|
||||
mut events_rx: mpsc::UnboundedReceiver<notify::Result<Event>>,
|
||||
mut ignore_path_rx: mpsc::UnboundedReceiver<IgnorePath>,
|
||||
mut stop_rx: oneshot::Receiver<()>,
|
||||
) {
|
||||
let mut event_handler = Handler::new();
|
||||
|
||||
let mut paths_to_ignore = HashSet::new();
|
||||
|
||||
loop {
|
||||
select! {
|
||||
Some(event) = events_rx.recv() => {
|
||||
|
@ -128,7 +141,8 @@ impl LocationWatcher {
|
|||
location_id,
|
||||
event,
|
||||
&mut event_handler,
|
||||
&library_ctx
|
||||
&library_ctx,
|
||||
&paths_to_ignore,
|
||||
).await {
|
||||
error!("Failed to handle location file system event: \
|
||||
<id='{location_id}', error='{e:#?}'>",
|
||||
|
@ -140,6 +154,15 @@ impl LocationWatcher {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
Some((path, ignore)) = ignore_path_rx.recv() => {
|
||||
if ignore {
|
||||
paths_to_ignore.insert(path);
|
||||
} else {
|
||||
paths_to_ignore.remove(&path);
|
||||
}
|
||||
}
|
||||
|
||||
_ = &mut stop_rx => {
|
||||
debug!("Stop Location Manager event handler for location: <id='{}'>", location_id);
|
||||
break
|
||||
|
@ -153,8 +176,9 @@ impl LocationWatcher {
|
|||
event: Event,
|
||||
event_handler: &mut impl EventHandler,
|
||||
library_ctx: &LibraryContext,
|
||||
ignore_paths: &HashSet<PathBuf>,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
if check_event(&event) {
|
||||
if check_event(&event, ignore_paths) {
|
||||
if let Some(location) = fetch_location(library_ctx, location_id)
|
||||
.include(indexer_job_location::include())
|
||||
.exec()
|
||||
|
@ -175,6 +199,14 @@ impl LocationWatcher {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) fn ignore_path(
|
||||
&self,
|
||||
path: PathBuf,
|
||||
ignore: bool,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
self.ignore_path_tx.send((path, ignore)).map_err(Into::into)
|
||||
}
|
||||
|
||||
pub(super) fn check_path(&self, path: impl AsRef<Path>) -> bool {
|
||||
self.path == path.as_ref()
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ use crate::{
|
|||
};
|
||||
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
path::{Path, PathBuf},
|
||||
str::FromStr,
|
||||
};
|
||||
|
@ -46,12 +47,13 @@ pub(super) fn check_location_online(location: &indexer_job_location::Data) -> bo
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) fn check_event(event: &Event) -> bool {
|
||||
pub(super) fn check_event(event: &Event, ignore_paths: &HashSet<PathBuf>) -> bool {
|
||||
// if first path includes .DS_Store, ignore
|
||||
if event.paths.iter().any(|p| {
|
||||
p.to_str()
|
||||
.expect("Found non-UTF-8 path")
|
||||
.contains(".DS_Store")
|
||||
|| ignore_paths.contains(p)
|
||||
}) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -1,9 +1,4 @@
|
|||
use std::{
|
||||
collections::VecDeque,
|
||||
fs::{self, File},
|
||||
io::Read,
|
||||
path::PathBuf,
|
||||
};
|
||||
use std::{collections::VecDeque, fs::File, io::Read, path::PathBuf};
|
||||
|
||||
use tokio::task;
|
||||
|
||||
|
@ -64,8 +59,8 @@ const JOB_NAME: &str = "file_encryptor";
|
|||
|
||||
#[async_trait::async_trait]
|
||||
impl StatefulJob for FileEncryptorJob {
|
||||
type Data = FileEncryptorJobState;
|
||||
type Init = FileEncryptorJobInit;
|
||||
type Data = FileEncryptorJobState;
|
||||
type Step = FileEncryptorJobStep;
|
||||
|
||||
fn name(&self) -> &'static str {
|
||||
|
@ -136,8 +131,18 @@ impl StatefulJob for FileEncryptorJob {
|
|||
Ok,
|
||||
)?;
|
||||
|
||||
let mut reader = File::open(info.obj_path.clone())?;
|
||||
let mut writer = File::create(output_path)?;
|
||||
let _guard = ctx
|
||||
.library_ctx
|
||||
.location_manager()
|
||||
.temporary_ignore_events_for_path(
|
||||
state.init.location_id,
|
||||
ctx.library_ctx.clone(),
|
||||
&output_path,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut reader = task::block_in_place(|| File::open(&info.obj_path))?;
|
||||
let mut writer = task::block_in_place(|| File::create(output_path))?;
|
||||
|
||||
let master_key = generate_master_key();
|
||||
|
||||
|
@ -202,7 +207,7 @@ impl StatefulJob for FileEncryptorJob {
|
|||
.join("thumbnails")
|
||||
.join(object.cas_id + ".webp");
|
||||
|
||||
if fs::metadata(pvm_path.clone()).is_ok() {
|
||||
if tokio::fs::metadata(&pvm_path).await.is_ok() {
|
||||
let mut pvm_bytes = Vec::new();
|
||||
task::block_in_place(|| {
|
||||
let mut pvm_file = File::open(pvm_path)?;
|
||||
|
|
|
@ -7,12 +7,11 @@ module.exports = {
|
|||
find: /^(~\/.+)/,
|
||||
replacement: '$1',
|
||||
async customResolver(source, importer) {
|
||||
const [repo, filePath] = importer.split('/packages/');
|
||||
const [pkg] = filePath.split('/src/');
|
||||
const [pkg] = importer.split('/src/');
|
||||
|
||||
const sourcePath = source.substring(2);
|
||||
const [_, sourcePath] = source.split('~/');
|
||||
|
||||
const absolutePath = `${repo}/packages/${pkg}/src/${sourcePath}`;
|
||||
const absolutePath = `${pkg}/src/${sourcePath}`;
|
||||
|
||||
const folderItems = await fs.readdir(path.join(absolutePath, '../'));
|
||||
|
||||
|
|
Loading…
Reference in a new issue