mirror of
https://github.com/spacedriveapp/spacedrive
synced 2024-07-02 10:03:28 +00:00
Just saving up some WIP
This commit is contained in:
parent
4acca6494c
commit
b2fcd4ab5a
35
Cargo.lock
generated
35
Cargo.lock
generated
|
@ -292,6 +292,17 @@ version = "0.7.5"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e22d1f4b888c298a027c99dc9048015fac177587de20fc30232a057dfbe24a21"
|
||||
|
||||
[[package]]
|
||||
name = "async-channel"
|
||||
version = "1.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833"
|
||||
dependencies = [
|
||||
"concurrent-queue 2.2.0",
|
||||
"event-listener",
|
||||
"futures-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-io"
|
||||
version = "1.9.0"
|
||||
|
@ -299,7 +310,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "83e21f3a490c72b3b0cf44962180e60045de2925d8dff97918f7ee43c8f637c7"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"concurrent-queue",
|
||||
"concurrent-queue 1.2.4",
|
||||
"futures-lite",
|
||||
"libc",
|
||||
"log",
|
||||
|
@ -1183,6 +1194,15 @@ dependencies = [
|
|||
"cache-padded",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "concurrent-queue"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "connection-string"
|
||||
version = "0.1.14"
|
||||
|
@ -2137,6 +2157,12 @@ dependencies = [
|
|||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-listener"
|
||||
version = "2.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
|
||||
|
||||
[[package]]
|
||||
name = "exr"
|
||||
version = "1.5.2"
|
||||
|
@ -6661,6 +6687,7 @@ dependencies = [
|
|||
name = "sd-core"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-channel",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"base64 0.21.0",
|
||||
|
@ -6671,6 +6698,7 @@ dependencies = [
|
|||
"enumflags2 0.7.5",
|
||||
"ffmpeg-next",
|
||||
"futures",
|
||||
"futures-concurrency",
|
||||
"globset",
|
||||
"hostname",
|
||||
"http-range",
|
||||
|
@ -6689,6 +6717,7 @@ dependencies = [
|
|||
"sd-crypto",
|
||||
"sd-ffmpeg",
|
||||
"sd-file-ext",
|
||||
"sd-location-watcher",
|
||||
"sd-p2p",
|
||||
"sd-sync",
|
||||
"serde",
|
||||
|
@ -6700,6 +6729,8 @@ dependencies = [
|
|||
"tempfile",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"tracing-test",
|
||||
|
@ -6769,6 +6800,7 @@ dependencies = [
|
|||
name = "sd-location-watcher"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-channel",
|
||||
"async-trait",
|
||||
"futures-concurrency",
|
||||
"notify",
|
||||
|
@ -8258,6 +8290,7 @@ dependencies = [
|
|||
"futures-core",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
@ -10,23 +10,20 @@ rust-version = "1.68.1"
|
|||
|
||||
[features]
|
||||
default = []
|
||||
mobile = [
|
||||
] # This feature allows features to be disabled when the Core is running on mobile.
|
||||
ffmpeg = [
|
||||
"dep:ffmpeg-next",
|
||||
"dep:sd-ffmpeg",
|
||||
] # This feature controls whether the Spacedrive Core contains functionality which requires FFmpeg.
|
||||
location-watcher = ["dep:notify"]
|
||||
mobile = [] # This feature allows features to be disabled when the Core is running on mobile.
|
||||
ffmpeg = ["dep:ffmpeg-next", "dep:sd-ffmpeg"] # This feature controls whether the Spacedrive Core contains functionality which requires FFmpeg.
|
||||
location-watcher = ["dep:notify", "dep:sd-location-watcher"]
|
||||
sync-messages = []
|
||||
|
||||
[dependencies]
|
||||
sd-ffmpeg = { path = "../crates/ffmpeg", optional = true }
|
||||
sd-crypto = { path = "../crates/crypto", features = [
|
||||
"rspc",
|
||||
"serde",
|
||||
"keymanager",
|
||||
] }
|
||||
sd-ffmpeg = { path = "../crates/ffmpeg", optional = true }
|
||||
sd-file-ext = { path = "../crates/file-ext" }
|
||||
sd-location-watcher = { path = "../crates/location-watcher", optional = true }
|
||||
sd-sync = { path = "../crates/sync" }
|
||||
sd-p2p = { path = "../crates/p2p", features = ["specta", "serde"] }
|
||||
|
||||
|
@ -77,6 +74,10 @@ notify = { version = "5.0.0", default-features = false, features = [
|
|||
"macos_fsevent",
|
||||
], optional = true }
|
||||
static_assertions = "1.1.0"
|
||||
tokio-util = "0.7.7"
|
||||
tokio-stream = { version = "0.1.12", features = ["sync", "time"] }
|
||||
async-channel = "1.8.0"
|
||||
futures-concurrency = "7.2.0"
|
||||
|
||||
[target.'cfg(windows)'.dependencies.winapi-util]
|
||||
version = "0.1.5"
|
||||
|
|
|
@ -128,7 +128,7 @@ impl Node {
|
|||
let config = NodeConfigManager::new(data_dir.to_path_buf()).await?;
|
||||
|
||||
let jobs = JobManager::new();
|
||||
let location_manager = LocationManager::new();
|
||||
let location_manager = Arc::new(LocationManager::new());
|
||||
let secure_temp_keystore = SecureTempKeystore::new();
|
||||
let (p2p, mut p2p_rx) = P2PManager::new(config.clone()).await;
|
||||
|
||||
|
@ -137,7 +137,7 @@ impl Node {
|
|||
NodeContext {
|
||||
config: config.clone(),
|
||||
jobs: jobs.clone(),
|
||||
location_manager: location_manager.clone(),
|
||||
location_manager: Arc::clone(&location_manager),
|
||||
p2p: p2p.clone(),
|
||||
event_bus_tx: event_bus.0.clone(),
|
||||
},
|
||||
|
|
|
@ -1,21 +1,9 @@
|
|||
use crate::{library::Library, prisma::location};
|
||||
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
path::PathBuf,
|
||||
time::Duration,
|
||||
use crate::{
|
||||
prisma::location,
|
||||
library::Library,
|
||||
};
|
||||
|
||||
use tokio::{fs, io::ErrorKind, sync::oneshot, time::sleep};
|
||||
use tracing::{error, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{watcher::LocationWatcher, LocationId, LocationManagerError};
|
||||
|
||||
type LibraryId = Uuid;
|
||||
type LocationAndLibraryKey = (LocationId, LibraryId);
|
||||
|
||||
const LOCATION_CHECK_INTERVAL: Duration = Duration::from_secs(5);
|
||||
use super::{LocationManagerError};
|
||||
|
||||
pub(super) async fn check_online(
|
||||
location: &location::Data,
|
||||
|
@ -45,217 +33,3 @@ pub(super) async fn check_online(
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) async fn location_check_sleep(
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
) -> (LocationId, Library) {
|
||||
sleep(LOCATION_CHECK_INTERVAL).await;
|
||||
(location_id, library)
|
||||
}
|
||||
|
||||
pub(super) fn watch_location(
|
||||
location: location::Data,
|
||||
library_id: LibraryId,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) {
|
||||
let location_id = location.id;
|
||||
if let Some(mut watcher) = locations_unwatched.remove(&(location_id, library_id)) {
|
||||
if watcher.check_path(&location.path) {
|
||||
watcher.watch();
|
||||
} else {
|
||||
watcher.update_data(location, true);
|
||||
}
|
||||
|
||||
locations_watched.insert((location_id, library_id), watcher);
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn unwatch_location(
|
||||
location: location::Data,
|
||||
library_id: LibraryId,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) {
|
||||
let location_id = location.id;
|
||||
if let Some(mut watcher) = locations_watched.remove(&(location_id, library_id)) {
|
||||
if watcher.check_path(&location.path) {
|
||||
watcher.unwatch();
|
||||
} else {
|
||||
watcher.update_data(location, false)
|
||||
}
|
||||
|
||||
locations_unwatched.insert((location_id, library_id), watcher);
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn drop_location(
|
||||
location_id: LocationId,
|
||||
library_id: LibraryId,
|
||||
message: &str,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) {
|
||||
warn!("{message}: <id='{location_id}', library_id='{library_id}'>",);
|
||||
if let Some(mut watcher) = locations_watched.remove(&(location_id, library_id)) {
|
||||
watcher.unwatch();
|
||||
} else {
|
||||
locations_unwatched.remove(&(location_id, library_id));
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn get_location(location_id: i32, library: &Library) -> Option<location::Data> {
|
||||
library
|
||||
.db
|
||||
.location()
|
||||
.find_unique(location::id::equals(location_id))
|
||||
.exec()
|
||||
.await
|
||||
.unwrap_or_else(|err| {
|
||||
error!("Failed to get location data from location_id: {:#?}", err);
|
||||
None
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) async fn handle_remove_location_request(
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
to_remove: &mut HashSet<LocationAndLibraryKey>,
|
||||
) {
|
||||
let key = (location_id, library.id);
|
||||
if let Some(location) = get_location(location_id, &library).await {
|
||||
if location.node_id == library.node_local_id {
|
||||
unwatch_location(location, library.id, locations_watched, locations_unwatched);
|
||||
locations_unwatched.remove(&key);
|
||||
forced_unwatch.remove(&key);
|
||||
} else {
|
||||
drop_location(
|
||||
location_id,
|
||||
library.id,
|
||||
"Dropping location from location manager, because we don't have a `local_path` anymore",
|
||||
locations_watched,
|
||||
locations_unwatched
|
||||
);
|
||||
}
|
||||
} else {
|
||||
drop_location(
|
||||
location_id,
|
||||
library.id,
|
||||
"Removing location from manager, as we failed to fetch from db",
|
||||
locations_watched,
|
||||
locations_unwatched,
|
||||
);
|
||||
}
|
||||
|
||||
// Marking location as removed, so we don't try to check it when the time comes
|
||||
to_remove.insert(key);
|
||||
|
||||
let _ = response_tx.send(Ok(())); // ignore errors, we handle errors on receiver
|
||||
}
|
||||
|
||||
pub(super) async fn handle_stop_watcher_request(
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) {
|
||||
async fn inner(
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
let key = (location_id, library.id);
|
||||
if !forced_unwatch.contains(&key) && locations_watched.contains_key(&key) {
|
||||
get_location(location_id, &library)
|
||||
.await
|
||||
.ok_or_else(|| LocationManagerError::FailedToStopOrReinitWatcher {
|
||||
reason: String::from("failed to fetch location from db"),
|
||||
})
|
||||
.map(|location| {
|
||||
unwatch_location(location, library.id, locations_watched, locations_unwatched);
|
||||
forced_unwatch.insert(key);
|
||||
})
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let _ = response_tx.send(
|
||||
inner(
|
||||
location_id,
|
||||
library,
|
||||
forced_unwatch,
|
||||
locations_watched,
|
||||
locations_unwatched,
|
||||
)
|
||||
.await,
|
||||
); // ignore errors, we handle errors on receiver
|
||||
}
|
||||
|
||||
pub(super) async fn handle_reinit_watcher_request(
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) {
|
||||
async fn inner(
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
let key = (location_id, library.id);
|
||||
if forced_unwatch.contains(&key) && locations_unwatched.contains_key(&key) {
|
||||
get_location(location_id, &library)
|
||||
.await
|
||||
.ok_or_else(|| LocationManagerError::FailedToStopOrReinitWatcher {
|
||||
reason: String::from("failed to fetch location from db"),
|
||||
})
|
||||
.map(|location| {
|
||||
watch_location(location, library.id, locations_watched, locations_unwatched);
|
||||
forced_unwatch.remove(&key);
|
||||
})
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let _ = response_tx.send(
|
||||
inner(
|
||||
location_id,
|
||||
library,
|
||||
forced_unwatch,
|
||||
locations_watched,
|
||||
locations_unwatched,
|
||||
)
|
||||
.await,
|
||||
); // ignore errors, we handle errors on receiver
|
||||
}
|
||||
|
||||
pub(super) fn handle_ignore_path_request(
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
path: PathBuf,
|
||||
ignore: bool,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
locations_watched: &HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) {
|
||||
let _ = response_tx.send(
|
||||
if let Some(watcher) = locations_watched.get(&(location_id, library.id)) {
|
||||
watcher.ignore_path(path, ignore)
|
||||
} else {
|
||||
Ok(())
|
||||
},
|
||||
); // ignore errors, we handle errors on receiver
|
||||
}
|
||||
|
|
261
core/src/location/manager/helpers_bkp.rs
Normal file
261
core/src/location/manager/helpers_bkp.rs
Normal file
|
@ -0,0 +1,261 @@
|
|||
use crate::{library::Library, prisma::location};
|
||||
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
path::PathBuf,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use tokio::{fs, io::ErrorKind, sync::oneshot, time::sleep};
|
||||
use tracing::{error, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{watcher::LocationWatcher, LocationId, LocationManagerError};
|
||||
|
||||
type LibraryId = Uuid;
|
||||
type LocationAndLibraryKey = (LocationId, LibraryId);
|
||||
|
||||
const LOCATION_CHECK_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
pub(super) async fn check_online(
|
||||
location: &location::Data,
|
||||
library: &Library,
|
||||
) -> Result<bool, LocationManagerError> {
|
||||
let pub_id = Uuid::from_slice(&location.pub_id)?;
|
||||
|
||||
if location.node_id == library.node_local_id {
|
||||
match fs::metadata(&location.path).await {
|
||||
Ok(_) => {
|
||||
library.location_manager().add_online(pub_id).await;
|
||||
Ok(true)
|
||||
}
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => {
|
||||
library.location_manager().remove_online(&pub_id).await;
|
||||
Ok(false)
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to check if location is online: {:#?}", e);
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// In this case, we don't have a `local_path`, but this location was marked as online
|
||||
library.location_manager().remove_online(&pub_id).await;
|
||||
Err(LocationManagerError::NonLocalLocation(location.id))
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn location_check_sleep(
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
) -> (LocationId, Library) {
|
||||
sleep(LOCATION_CHECK_INTERVAL).await;
|
||||
(location_id, library)
|
||||
}
|
||||
|
||||
pub(super) fn watch_location(
|
||||
location: location::Data,
|
||||
library_id: LibraryId,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) {
|
||||
let location_id = location.id;
|
||||
if let Some(mut watcher) = locations_unwatched.remove(&(location_id, library_id)) {
|
||||
if watcher.check_path(&location.path) {
|
||||
watcher.watch();
|
||||
} else {
|
||||
watcher.update_data(location, true);
|
||||
}
|
||||
|
||||
locations_watched.insert((location_id, library_id), watcher);
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn unwatch_location(
|
||||
location: location::Data,
|
||||
library_id: LibraryId,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) {
|
||||
let location_id = location.id;
|
||||
if let Some(mut watcher) = locations_watched.remove(&(location_id, library_id)) {
|
||||
if watcher.check_path(&location.path) {
|
||||
watcher.unwatch();
|
||||
} else {
|
||||
watcher.update_data(location, false)
|
||||
}
|
||||
|
||||
locations_unwatched.insert((location_id, library_id), watcher);
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn drop_location(
|
||||
location_id: LocationId,
|
||||
library_id: LibraryId,
|
||||
message: &str,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) {
|
||||
warn!("{message}: <id='{location_id}', library_id='{library_id}'>",);
|
||||
if let Some(mut watcher) = locations_watched.remove(&(location_id, library_id)) {
|
||||
watcher.unwatch();
|
||||
} else {
|
||||
locations_unwatched.remove(&(location_id, library_id));
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn get_location(location_id: i32, library: &Library) -> Option<location::Data> {
|
||||
library
|
||||
.db
|
||||
.location()
|
||||
.find_unique(location::id::equals(location_id))
|
||||
.exec()
|
||||
.await
|
||||
.unwrap_or_else(|err| {
|
||||
error!("Failed to get location data from location_id: {:#?}", err);
|
||||
None
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) async fn handle_remove_location_request(
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
to_remove: &mut HashSet<LocationAndLibraryKey>,
|
||||
) {
|
||||
let key = (location_id, library.id);
|
||||
if let Some(location) = get_location(location_id, &library).await {
|
||||
if location.node_id == library.node_local_id {
|
||||
unwatch_location(location, library.id, locations_watched, locations_unwatched);
|
||||
locations_unwatched.remove(&key);
|
||||
forced_unwatch.remove(&key);
|
||||
} else {
|
||||
drop_location(
|
||||
location_id,
|
||||
library.id,
|
||||
"Dropping location from location manager, because we don't have a `local_path` anymore",
|
||||
locations_watched,
|
||||
locations_unwatched
|
||||
);
|
||||
}
|
||||
} else {
|
||||
drop_location(
|
||||
location_id,
|
||||
library.id,
|
||||
"Removing location from manager, as we failed to fetch from db",
|
||||
locations_watched,
|
||||
locations_unwatched,
|
||||
);
|
||||
}
|
||||
|
||||
// Marking location as removed, so we don't try to check it when the time comes
|
||||
to_remove.insert(key);
|
||||
|
||||
let _ = response_tx.send(Ok(())); // ignore errors, we handle errors on receiver
|
||||
}
|
||||
|
||||
pub(super) async fn handle_stop_watcher_request(
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) {
|
||||
async fn inner(
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
let key = (location_id, library.id);
|
||||
if !forced_unwatch.contains(&key) && locations_watched.contains_key(&key) {
|
||||
get_location(location_id, &library)
|
||||
.await
|
||||
.ok_or_else(|| LocationManagerError::FailedToStopOrReinitWatcher {
|
||||
reason: String::from("failed to fetch location from db"),
|
||||
})
|
||||
.map(|location| {
|
||||
unwatch_location(location, library.id, locations_watched, locations_unwatched);
|
||||
forced_unwatch.insert(key);
|
||||
})
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let _ = response_tx.send(
|
||||
inner(
|
||||
location_id,
|
||||
library,
|
||||
forced_unwatch,
|
||||
locations_watched,
|
||||
locations_unwatched,
|
||||
)
|
||||
.await,
|
||||
); // ignore errors, we handle errors on receiver
|
||||
}
|
||||
|
||||
pub(super) async fn handle_reinit_watcher_request(
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) {
|
||||
async fn inner(
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
forced_unwatch: &mut HashSet<LocationAndLibraryKey>,
|
||||
locations_watched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
locations_unwatched: &mut HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
let key = (location_id, library.id);
|
||||
if forced_unwatch.contains(&key) && locations_unwatched.contains_key(&key) {
|
||||
get_location(location_id, &library)
|
||||
.await
|
||||
.ok_or_else(|| LocationManagerError::FailedToStopOrReinitWatcher {
|
||||
reason: String::from("failed to fetch location from db"),
|
||||
})
|
||||
.map(|location| {
|
||||
watch_location(location, library.id, locations_watched, locations_unwatched);
|
||||
forced_unwatch.remove(&key);
|
||||
})
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let _ = response_tx.send(
|
||||
inner(
|
||||
location_id,
|
||||
library,
|
||||
forced_unwatch,
|
||||
locations_watched,
|
||||
locations_unwatched,
|
||||
)
|
||||
.await,
|
||||
); // ignore errors, we handle errors on receiver
|
||||
}
|
||||
|
||||
pub(super) fn handle_ignore_path_request(
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
path: PathBuf,
|
||||
ignore: bool,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
locations_watched: &HashMap<LocationAndLibraryKey, LocationWatcher>,
|
||||
) {
|
||||
let _ = response_tx.send(
|
||||
if let Some(watcher) = locations_watched.get(&(location_id, library.id)) {
|
||||
watcher.ignore_path(path, ignore)
|
||||
} else {
|
||||
Ok(())
|
||||
},
|
||||
); // ignore errors, we handle errors on receiver
|
||||
}
|
|
@ -1,588 +1,161 @@
|
|||
use crate::library::Library;
|
||||
|
||||
use std::{
|
||||
collections::BTreeSet,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
use std::collections::BTreeSet;
|
||||
use std::pin::{pin, Pin};
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::executor::block_on;
|
||||
use thiserror::Error;
|
||||
use tokio::{
|
||||
io,
|
||||
sync::{
|
||||
broadcast::{self, Receiver},
|
||||
oneshot, RwLock,
|
||||
},
|
||||
use futures::{Future, FutureExt};
|
||||
use futures_concurrency::future::Race;
|
||||
#[cfg(feature = "location-watcher")]
|
||||
use sd_location_watcher::{
|
||||
EventKind, INodeAndDevice, LocationPubId, LocationWatcher, LocationWatcherError, WatcherEvent,
|
||||
};
|
||||
#[cfg(not(feature = "location-watcher"))]
|
||||
type LocationPubId = Uuid;
|
||||
|
||||
use futures::{stream::FuturesUnordered, SinkExt, StreamExt};
|
||||
use futures_concurrency::stream::Merge;
|
||||
use thiserror::Error;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::time::{sleep, Sleep};
|
||||
use tokio_stream::{self as stream, wrappers::ReceiverStream};
|
||||
use tokio_util::sync::{CancellationToken, DropGuard};
|
||||
use tracing::{debug, error};
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
use tokio::sync::mpsc;
|
||||
use uuid::Uuid;
|
||||
use super::LocationId;
|
||||
|
||||
use super::{file_path_helper::FilePathError, LocationId};
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
mod watcher;
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
mod helpers;
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
#[allow(dead_code)]
|
||||
enum ManagementMessageAction {
|
||||
Add,
|
||||
Remove,
|
||||
}
|
||||
use helpers::{check_online, get_location};
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub struct LocationManagementMessage {
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
action: ManagementMessageAction,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
enum WatcherManagementMessageAction {
|
||||
Stop,
|
||||
Reinit,
|
||||
IgnoreEventsForPath { path: PathBuf, ignore: bool },
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub struct WatcherManagementMessage {
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
action: WatcherManagementMessageAction,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
enum ManagerMessage {
|
||||
Add {
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum LocationManagerError {
|
||||
#[cfg(feature = "location-watcher")]
|
||||
#[error("Unable to send location management message to location manager actor: (error: {0})")]
|
||||
ActorSendLocationError(#[from] mpsc::error::SendError<LocationManagementMessage>),
|
||||
pub enum LocationManagerError {}
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
#[error("Unable to send path to be ignored by watcher actor: (error: {0})")]
|
||||
ActorIgnorePathError(#[from] mpsc::error::SendError<watcher::IgnorePath>),
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
#[error("Unable to watcher management message to watcher manager actor: (error: {0})")]
|
||||
ActorIgnorePathMessageError(#[from] mpsc::error::SendError<WatcherManagementMessage>),
|
||||
|
||||
#[error("Unable to receive actor response: (error: {0})")]
|
||||
ActorResponseError(#[from] oneshot::error::RecvError),
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
#[error("Watcher error: (error: {0})")]
|
||||
WatcherError(#[from] notify::Error),
|
||||
|
||||
#[error("Failed to stop or reinit a watcher: {reason}")]
|
||||
FailedToStopOrReinitWatcher { reason: String },
|
||||
|
||||
#[error("Missing location from database: <id='{0}'>")]
|
||||
MissingLocation(LocationId),
|
||||
|
||||
#[error("Non local location: <id='{0}'>")]
|
||||
NonLocalLocation(LocationId),
|
||||
|
||||
#[error("Tried to update a non-existing file: <path='{0}'>")]
|
||||
UpdateNonExistingFile(PathBuf),
|
||||
#[error("Database error: {0}")]
|
||||
DatabaseError(#[from] prisma_client_rust::QueryError),
|
||||
#[error("I/O error: {0}")]
|
||||
IOError(#[from] io::Error),
|
||||
#[error("File path related error (error: {0})")]
|
||||
FilePathError(#[from] FilePathError),
|
||||
#[error("Corrupted location pub_id on database: (error: {0})")]
|
||||
CorruptedLocationPubId(#[from] uuid::Error),
|
||||
}
|
||||
|
||||
type OnlineLocations = BTreeSet<Vec<u8>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LocationManager {
|
||||
online_locations: RwLock<OnlineLocations>,
|
||||
pub online_tx: broadcast::Sender<OnlineLocations>,
|
||||
#[cfg(feature = "location-watcher")]
|
||||
location_management_tx: mpsc::Sender<LocationManagementMessage>,
|
||||
#[cfg(feature = "location-watcher")]
|
||||
watcher_management_tx: mpsc::Sender<WatcherManagementMessage>,
|
||||
stop_tx: Option<oneshot::Sender<()>>,
|
||||
online_locations: RwLock<BTreeSet<LocationPubId>>,
|
||||
location_management_tx: async_channel::Sender<ManagerMessage>,
|
||||
_cancel_loop: DropGuard,
|
||||
}
|
||||
|
||||
impl LocationManager {
|
||||
pub fn new() -> Arc<Self> {
|
||||
let online_tx = broadcast::channel(16).0;
|
||||
pub fn new() -> Self {
|
||||
let cancel_token = CancellationToken::new();
|
||||
let (location_management_tx, location_management_rx) = async_channel::bounded(128);
|
||||
|
||||
debug!("LocationManager initialized");
|
||||
let inner_cancel_token = cancel_token.child_token();
|
||||
tokio::spawn(async move {
|
||||
let location_management_rx = location_management_rx;
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
{
|
||||
let (location_management_tx, location_management_rx) = mpsc::channel(128);
|
||||
let (watcher_management_tx, watcher_management_rx) = mpsc::channel(128);
|
||||
let (stop_tx, stop_rx) = oneshot::channel();
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
tokio::spawn(Self::run_locations_checker(
|
||||
location_management_rx,
|
||||
watcher_management_rx,
|
||||
stop_rx,
|
||||
));
|
||||
|
||||
Arc::new(Self {
|
||||
online_locations: Default::default(),
|
||||
online_tx,
|
||||
location_management_tx,
|
||||
watcher_management_tx,
|
||||
stop_tx: Some(stop_tx),
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "location-watcher"))]
|
||||
{
|
||||
tracing::warn!("Location watcher is disabled, locations will not be checked");
|
||||
Arc::new(Self {
|
||||
online_tx,
|
||||
online_locations: Default::default(),
|
||||
stop_tx: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[allow(unused_variables)]
|
||||
async fn location_management_message(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
action: ManagementMessageAction,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
#[cfg(feature = "location-watcher")]
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
self.location_management_tx
|
||||
.send(LocationManagementMessage {
|
||||
location_id,
|
||||
library,
|
||||
action,
|
||||
response_tx: tx,
|
||||
})
|
||||
.await?;
|
||||
|
||||
rx.await?
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "location-watcher"))]
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[allow(unused_variables)]
|
||||
async fn watcher_management_message(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
action: WatcherManagementMessageAction,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
#[cfg(feature = "location-watcher")]
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
self.watcher_management_tx
|
||||
.send(WatcherManagementMessage {
|
||||
location_id,
|
||||
library,
|
||||
action,
|
||||
response_tx: tx,
|
||||
})
|
||||
.await?;
|
||||
|
||||
rx.await?
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "location-watcher"))]
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn add(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
self.location_management_message(location_id, library, ManagementMessageAction::Add)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn remove(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
self.location_management_message(location_id, library, ManagementMessageAction::Remove)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn stop_watcher(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
self.watcher_management_message(location_id, library, WatcherManagementMessageAction::Stop)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn reinit_watcher(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
self.watcher_management_message(
|
||||
location_id,
|
||||
library,
|
||||
WatcherManagementMessageAction::Reinit,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn temporary_stop(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
) -> Result<StopWatcherGuard, LocationManagerError> {
|
||||
self.stop_watcher(location_id, library.clone()).await?;
|
||||
|
||||
Ok(StopWatcherGuard {
|
||||
location_id,
|
||||
library: Some(library),
|
||||
manager: self,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn temporary_ignore_events_for_path(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
path: impl AsRef<Path>,
|
||||
) -> Result<IgnoreEventsForPathGuard, LocationManagerError> {
|
||||
let path = path.as_ref().to_path_buf();
|
||||
|
||||
self.watcher_management_message(
|
||||
location_id,
|
||||
library.clone(),
|
||||
WatcherManagementMessageAction::IgnoreEventsForPath {
|
||||
path: path.clone(),
|
||||
ignore: true,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(IgnoreEventsForPathGuard {
|
||||
location_id,
|
||||
library: Some(library),
|
||||
manager: self,
|
||||
path: Some(path),
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
async fn run_locations_checker(
|
||||
mut location_management_rx: mpsc::Receiver<LocationManagementMessage>,
|
||||
mut watcher_management_rx: mpsc::Receiver<WatcherManagementMessage>,
|
||||
mut stop_rx: oneshot::Receiver<()>,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use tokio::select;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use helpers::{
|
||||
check_online, drop_location, get_location, handle_ignore_path_request,
|
||||
handle_reinit_watcher_request, handle_remove_location_request,
|
||||
handle_stop_watcher_request, location_check_sleep, unwatch_location, watch_location,
|
||||
};
|
||||
use watcher::LocationWatcher;
|
||||
|
||||
let mut to_check_futures = FuturesUnordered::new();
|
||||
let mut to_remove = HashSet::new();
|
||||
let mut locations_watched = HashMap::new();
|
||||
let mut locations_unwatched = HashMap::new();
|
||||
let mut forced_unwatch = HashSet::new();
|
||||
|
||||
loop {
|
||||
select! {
|
||||
// Location management messages
|
||||
Some(LocationManagementMessage{
|
||||
location_id,
|
||||
library,
|
||||
action,
|
||||
response_tx
|
||||
}) = location_management_rx.recv() => {
|
||||
match action {
|
||||
|
||||
// To add a new location
|
||||
ManagementMessageAction::Add => {
|
||||
if let Some(location) = get_location(location_id, &library).await {
|
||||
let is_online = match check_online(&location, &library).await {
|
||||
Ok(is_online) => is_online,
|
||||
Err(e) => {
|
||||
error!("Error while checking online status of location {location_id}: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let _ = response_tx.send(
|
||||
LocationWatcher::new(location, library.clone())
|
||||
.await
|
||||
.map(|mut watcher| {
|
||||
if is_online {
|
||||
watcher.watch();
|
||||
locations_watched.insert(
|
||||
(location_id, library.id),
|
||||
watcher
|
||||
);
|
||||
} else {
|
||||
locations_unwatched.insert(
|
||||
(location_id, library.id),
|
||||
watcher
|
||||
);
|
||||
}
|
||||
|
||||
to_check_futures.push(
|
||||
location_check_sleep(location_id, library)
|
||||
);
|
||||
}
|
||||
)
|
||||
); // ignore errors, we handle errors on receiver
|
||||
} else {
|
||||
warn!(
|
||||
"Location not found in database to be watched: {}",
|
||||
location_id
|
||||
);
|
||||
}
|
||||
},
|
||||
|
||||
// To remove an location
|
||||
ManagementMessageAction::Remove => {
|
||||
handle_remove_location_request(
|
||||
location_id,
|
||||
library,
|
||||
response_tx,
|
||||
&mut forced_unwatch,
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched,
|
||||
&mut to_remove,
|
||||
).await;
|
||||
},
|
||||
}
|
||||
loop {
|
||||
if let Err(e) = tokio::spawn(management_loop(
|
||||
location_management_rx.clone(),
|
||||
inner_cancel_token.child_token(),
|
||||
))
|
||||
.await
|
||||
{
|
||||
error!("Location manager loop failed: {e}; Restarting...");
|
||||
}
|
||||
|
||||
// Watcher management messages
|
||||
Some(WatcherManagementMessage{
|
||||
location_id,
|
||||
library,
|
||||
action,
|
||||
response_tx,
|
||||
}) = watcher_management_rx.recv() => {
|
||||
match action {
|
||||
// To stop a watcher
|
||||
WatcherManagementMessageAction::Stop => {
|
||||
handle_stop_watcher_request(
|
||||
location_id,
|
||||
library,
|
||||
response_tx,
|
||||
&mut forced_unwatch,
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched,
|
||||
).await;
|
||||
},
|
||||
|
||||
// To reinit a stopped watcher
|
||||
WatcherManagementMessageAction::Reinit => {
|
||||
handle_reinit_watcher_request(
|
||||
location_id,
|
||||
library,
|
||||
response_tx,
|
||||
&mut forced_unwatch,
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched,
|
||||
).await;
|
||||
},
|
||||
|
||||
// To ignore or not events for a path
|
||||
WatcherManagementMessageAction::IgnoreEventsForPath { path, ignore } => {
|
||||
handle_ignore_path_request(
|
||||
location_id,
|
||||
library,
|
||||
path,
|
||||
ignore,
|
||||
response_tx,
|
||||
&locations_watched,
|
||||
);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Periodically checking locations
|
||||
Some((location_id, library)) = to_check_futures.next() => {
|
||||
let key = (location_id, library.id);
|
||||
|
||||
if to_remove.contains(&key) {
|
||||
// The time to check came for an already removed library, so we just ignore it
|
||||
to_remove.remove(&key);
|
||||
} else if let Some(location) = get_location(location_id, &library).await {
|
||||
if location.node_id == library.node_local_id {
|
||||
let is_online = match check_online(&location, &library).await {
|
||||
Ok(is_online) => is_online,
|
||||
Err(e) => {
|
||||
error!("Error while checking online status of location {location_id}: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if is_online
|
||||
&& !forced_unwatch.contains(&key)
|
||||
{
|
||||
watch_location(
|
||||
location,
|
||||
library.id,
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched,
|
||||
);
|
||||
} else {
|
||||
unwatch_location(
|
||||
location,
|
||||
library.id,
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched,
|
||||
);
|
||||
}
|
||||
to_check_futures.push(location_check_sleep(location_id, library));
|
||||
} else {
|
||||
drop_location(
|
||||
location_id,
|
||||
library.id,
|
||||
"Dropping location from location manager, because \
|
||||
it isn't a location in the current node",
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched
|
||||
);
|
||||
forced_unwatch.remove(&key);
|
||||
}
|
||||
} else {
|
||||
drop_location(
|
||||
location_id,
|
||||
library.id,
|
||||
"Removing location from manager, as we failed to fetch from db",
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched,
|
||||
);
|
||||
forced_unwatch.remove(&key);
|
||||
}
|
||||
}
|
||||
|
||||
_ = &mut stop_rx => {
|
||||
info!("Stopping location manager");
|
||||
if inner_cancel_token.is_cancelled() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Self {
|
||||
online_locations: Default::default(),
|
||||
location_management_tx,
|
||||
_cancel_loop: cancel_token.drop_guard(),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn is_online(&self, id: &Uuid) -> bool {
|
||||
let online_locations = self.online_locations.read().await;
|
||||
online_locations.iter().any(|v| v == id.as_bytes())
|
||||
}
|
||||
|
||||
pub async fn get_online(&self) -> OnlineLocations {
|
||||
self.online_locations.read().await.clone()
|
||||
}
|
||||
|
||||
async fn broadcast_online(&self) {
|
||||
self.online_tx.send(self.get_online().await).ok();
|
||||
}
|
||||
|
||||
pub async fn add_online(&self, id: Uuid) {
|
||||
self.online_locations
|
||||
.write()
|
||||
.await
|
||||
.insert(id.as_bytes().to_vec());
|
||||
self.broadcast_online().await;
|
||||
}
|
||||
|
||||
pub async fn remove_online(&self, id: &Uuid) {
|
||||
let mut online_locations = self.online_locations.write().await;
|
||||
online_locations.retain(|v| v != id.as_bytes());
|
||||
self.broadcast_online().await;
|
||||
}
|
||||
|
||||
pub fn online_rx(&self) -> Receiver<OnlineLocations> {
|
||||
self.online_tx.subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for LocationManager {
|
||||
fn drop(&mut self) {
|
||||
if let Some(stop_tx) = self.stop_tx.take() {
|
||||
if stop_tx.send(()).is_err() {
|
||||
error!("Failed to send stop signal to location manager");
|
||||
enum StreamMessage {
|
||||
Management(ManagerMessage),
|
||||
Check {
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
},
|
||||
Stop,
|
||||
}
|
||||
|
||||
async fn management_loop(
|
||||
location_management_rx: async_channel::Receiver<ManagerMessage>,
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
|
||||
let mut online_checker = FuturesUnordered::<LocationCheck>::new();
|
||||
|
||||
let mut stream = (
|
||||
location_management_rx.map(StreamMessage::Management),
|
||||
stream::once(cancel.cancelled()).map(|_| StreamMessage::Stop),
|
||||
|
||||
)
|
||||
.merge();
|
||||
|
||||
loop {
|
||||
if let Some(msg) = (online_checker.next(), stream.next()).race().await {
|
||||
match msg {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while let Some(msg) = stream.next().await {
|
||||
match msg {
|
||||
StreamMessage::Management(msg) => match msg {
|
||||
ManagerMessage::Add {
|
||||
location_id,
|
||||
library,
|
||||
} => todo!(),
|
||||
},
|
||||
StreamMessage::Check {
|
||||
location_id,
|
||||
library,
|
||||
} => {
|
||||
online_checker.push(LocationCheck::new(location_id, library));
|
||||
}
|
||||
StreamMessage::Stop => {
|
||||
debug!("Stoped Location Manager event handler",);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use = "this `StopWatcherGuard` must be held for some time, so the watcher is stopped"]
|
||||
pub struct StopWatcherGuard<'m> {
|
||||
manager: &'m LocationManager,
|
||||
struct LocationCheck {
|
||||
location_id: LocationId,
|
||||
library: Option<Library>,
|
||||
library: Library,
|
||||
sleep: Sleep,
|
||||
}
|
||||
|
||||
impl Drop for StopWatcherGuard<'_> {
|
||||
fn drop(&mut self) {
|
||||
if cfg!(feature = "location-watcher") {
|
||||
// FIXME: change this Drop to async drop in the future
|
||||
if let Err(e) = block_on(
|
||||
self.manager
|
||||
.reinit_watcher(self.location_id, self.library.take().unwrap()),
|
||||
) {
|
||||
error!("Failed to reinit watcher on stop watcher guard drop: {e}");
|
||||
}
|
||||
impl LocationCheck {
|
||||
fn new(location_id: LocationId, library: Library) -> Self {
|
||||
Self {
|
||||
location_id,
|
||||
library,
|
||||
sleep: sleep(Duration::from_secs(5)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use = "this `IgnoreEventsForPathGuard` must be held for some time, so the watcher can ignore events for the desired path"]
|
||||
pub struct IgnoreEventsForPathGuard<'m> {
|
||||
manager: &'m LocationManager,
|
||||
path: Option<PathBuf>,
|
||||
location_id: LocationId,
|
||||
library: Option<Library>,
|
||||
}
|
||||
impl Future for LocationCheck {
|
||||
type Output = StreamMessage;
|
||||
|
||||
impl Drop for IgnoreEventsForPathGuard<'_> {
|
||||
fn drop(&mut self) {
|
||||
if cfg!(feature = "location-watcher") {
|
||||
// FIXME: change this Drop to async drop in the future
|
||||
if let Err(e) = block_on(self.manager.watcher_management_message(
|
||||
self.location_id,
|
||||
self.library.take().unwrap(),
|
||||
WatcherManagementMessageAction::IgnoreEventsForPath {
|
||||
path: self.path.take().unwrap(),
|
||||
ignore: false,
|
||||
},
|
||||
)) {
|
||||
error!("Failed to un-ignore path on watcher guard drop: {e}");
|
||||
}
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
match pin!(self.sleep).poll(cx) {
|
||||
Poll::Ready(_) => Poll::Ready(StreamMessage::Check {
|
||||
location_id: self.location_id,
|
||||
library: self.library,
|
||||
}),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
588
core/src/location/manager/mod_bkp.rs
Normal file
588
core/src/location/manager/mod_bkp.rs
Normal file
|
@ -0,0 +1,588 @@
|
|||
use crate::library::Library;
|
||||
|
||||
use std::{
|
||||
collections::BTreeSet,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use futures::executor::block_on;
|
||||
use thiserror::Error;
|
||||
use tokio::{
|
||||
io,
|
||||
sync::{
|
||||
broadcast::{self, Receiver},
|
||||
oneshot, RwLock,
|
||||
},
|
||||
};
|
||||
use tracing::{debug, error};
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
use tokio::sync::mpsc;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{file_path_helper::FilePathError, LocationId};
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
mod watcher;
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
mod helpers;
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
#[allow(dead_code)]
|
||||
enum ManagementMessageAction {
|
||||
Add,
|
||||
Remove,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub struct LocationManagementMessage {
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
action: ManagementMessageAction,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
enum WatcherManagementMessageAction {
|
||||
Stop,
|
||||
Reinit,
|
||||
IgnoreEventsForPath { path: PathBuf, ignore: bool },
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub struct WatcherManagementMessage {
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
action: WatcherManagementMessageAction,
|
||||
response_tx: oneshot::Sender<Result<(), LocationManagerError>>,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum LocationManagerError {
|
||||
#[cfg(feature = "location-watcher")]
|
||||
#[error("Unable to send location management message to location manager actor: (error: {0})")]
|
||||
ActorSendLocationError(#[from] mpsc::error::SendError<LocationManagementMessage>),
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
#[error("Unable to send path to be ignored by watcher actor: (error: {0})")]
|
||||
ActorIgnorePathError(#[from] mpsc::error::SendError<watcher::IgnorePath>),
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
#[error("Unable to watcher management message to watcher manager actor: (error: {0})")]
|
||||
ActorIgnorePathMessageError(#[from] mpsc::error::SendError<WatcherManagementMessage>),
|
||||
|
||||
#[error("Unable to receive actor response: (error: {0})")]
|
||||
ActorResponseError(#[from] oneshot::error::RecvError),
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
#[error("Watcher error: (error: {0})")]
|
||||
WatcherError(#[from] notify::Error),
|
||||
|
||||
#[error("Failed to stop or reinit a watcher: {reason}")]
|
||||
FailedToStopOrReinitWatcher { reason: String },
|
||||
|
||||
#[error("Missing location from database: <id='{0}'>")]
|
||||
MissingLocation(LocationId),
|
||||
|
||||
#[error("Non local location: <id='{0}'>")]
|
||||
NonLocalLocation(LocationId),
|
||||
|
||||
#[error("Tried to update a non-existing file: <path='{0}'>")]
|
||||
UpdateNonExistingFile(PathBuf),
|
||||
#[error("Database error: {0}")]
|
||||
DatabaseError(#[from] prisma_client_rust::QueryError),
|
||||
#[error("I/O error: {0}")]
|
||||
IOError(#[from] io::Error),
|
||||
#[error("File path related error (error: {0})")]
|
||||
FilePathError(#[from] FilePathError),
|
||||
#[error("Corrupted location pub_id on database: (error: {0})")]
|
||||
CorruptedLocationPubId(#[from] uuid::Error),
|
||||
}
|
||||
|
||||
type OnlineLocations = BTreeSet<Vec<u8>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LocationManager {
|
||||
online_locations: RwLock<OnlineLocations>,
|
||||
pub online_tx: broadcast::Sender<OnlineLocations>,
|
||||
#[cfg(feature = "location-watcher")]
|
||||
location_management_tx: mpsc::Sender<LocationManagementMessage>,
|
||||
#[cfg(feature = "location-watcher")]
|
||||
watcher_management_tx: mpsc::Sender<WatcherManagementMessage>,
|
||||
stop_tx: Option<oneshot::Sender<()>>,
|
||||
}
|
||||
|
||||
impl LocationManager {
|
||||
pub fn new() -> Arc<Self> {
|
||||
let online_tx = broadcast::channel(16).0;
|
||||
|
||||
debug!("LocationManager initialized");
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
{
|
||||
let (location_management_tx, location_management_rx) = mpsc::channel(128);
|
||||
let (watcher_management_tx, watcher_management_rx) = mpsc::channel(128);
|
||||
let (stop_tx, stop_rx) = oneshot::channel();
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
tokio::spawn(Self::run_locations_checker(
|
||||
location_management_rx,
|
||||
watcher_management_rx,
|
||||
stop_rx,
|
||||
));
|
||||
|
||||
Arc::new(Self {
|
||||
online_locations: Default::default(),
|
||||
online_tx,
|
||||
location_management_tx,
|
||||
watcher_management_tx,
|
||||
stop_tx: Some(stop_tx),
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "location-watcher"))]
|
||||
{
|
||||
tracing::warn!("Location watcher is disabled, locations will not be checked");
|
||||
Arc::new(Self {
|
||||
online_tx,
|
||||
online_locations: Default::default(),
|
||||
stop_tx: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[allow(unused_variables)]
|
||||
async fn location_management_message(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
action: ManagementMessageAction,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
#[cfg(feature = "location-watcher")]
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
self.location_management_tx
|
||||
.send(LocationManagementMessage {
|
||||
location_id,
|
||||
library,
|
||||
action,
|
||||
response_tx: tx,
|
||||
})
|
||||
.await?;
|
||||
|
||||
rx.await?
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "location-watcher"))]
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[allow(unused_variables)]
|
||||
async fn watcher_management_message(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
action: WatcherManagementMessageAction,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
#[cfg(feature = "location-watcher")]
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
self.watcher_management_tx
|
||||
.send(WatcherManagementMessage {
|
||||
location_id,
|
||||
library,
|
||||
action,
|
||||
response_tx: tx,
|
||||
})
|
||||
.await?;
|
||||
|
||||
rx.await?
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "location-watcher"))]
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn add(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
self.location_management_message(location_id, library, ManagementMessageAction::Add)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn remove(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
self.location_management_message(location_id, library, ManagementMessageAction::Remove)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn stop_watcher(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
self.watcher_management_message(location_id, library, WatcherManagementMessageAction::Stop)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn reinit_watcher(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
self.watcher_management_message(
|
||||
location_id,
|
||||
library,
|
||||
WatcherManagementMessageAction::Reinit,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn temporary_stop(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
) -> Result<StopWatcherGuard, LocationManagerError> {
|
||||
self.stop_watcher(location_id, library.clone()).await?;
|
||||
|
||||
Ok(StopWatcherGuard {
|
||||
location_id,
|
||||
library: Some(library),
|
||||
manager: self,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn temporary_ignore_events_for_path(
|
||||
&self,
|
||||
location_id: LocationId,
|
||||
library: Library,
|
||||
path: impl AsRef<Path>,
|
||||
) -> Result<IgnoreEventsForPathGuard, LocationManagerError> {
|
||||
let path = path.as_ref().to_path_buf();
|
||||
|
||||
self.watcher_management_message(
|
||||
location_id,
|
||||
library.clone(),
|
||||
WatcherManagementMessageAction::IgnoreEventsForPath {
|
||||
path: path.clone(),
|
||||
ignore: true,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(IgnoreEventsForPathGuard {
|
||||
location_id,
|
||||
library: Some(library),
|
||||
manager: self,
|
||||
path: Some(path),
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(feature = "location-watcher")]
|
||||
async fn run_locations_checker(
|
||||
mut location_management_rx: mpsc::Receiver<LocationManagementMessage>,
|
||||
mut watcher_management_rx: mpsc::Receiver<WatcherManagementMessage>,
|
||||
mut stop_rx: oneshot::Receiver<()>,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use tokio::select;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use helpers::{
|
||||
check_online, drop_location, get_location, handle_ignore_path_request,
|
||||
handle_reinit_watcher_request, handle_remove_location_request,
|
||||
handle_stop_watcher_request, location_check_sleep, unwatch_location, watch_location,
|
||||
};
|
||||
use watcher::LocationWatcher;
|
||||
|
||||
let mut to_check_futures = FuturesUnordered::new();
|
||||
let mut to_remove = HashSet::new();
|
||||
let mut locations_watched = HashMap::new();
|
||||
let mut locations_unwatched = HashMap::new();
|
||||
let mut forced_unwatch = HashSet::new();
|
||||
|
||||
loop {
|
||||
select! {
|
||||
// Location management messages
|
||||
Some(LocationManagementMessage{
|
||||
location_id,
|
||||
library,
|
||||
action,
|
||||
response_tx
|
||||
}) = location_management_rx.recv() => {
|
||||
match action {
|
||||
|
||||
// To add a new location
|
||||
ManagementMessageAction::Add => {
|
||||
if let Some(location) = get_location(location_id, &library).await {
|
||||
let is_online = match check_online(&location, &library).await {
|
||||
Ok(is_online) => is_online,
|
||||
Err(e) => {
|
||||
error!("Error while checking online status of location {location_id}: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let _ = response_tx.send(
|
||||
LocationWatcher::new(location, library.clone())
|
||||
.await
|
||||
.map(|mut watcher| {
|
||||
if is_online {
|
||||
watcher.watch();
|
||||
locations_watched.insert(
|
||||
(location_id, library.id),
|
||||
watcher
|
||||
);
|
||||
} else {
|
||||
locations_unwatched.insert(
|
||||
(location_id, library.id),
|
||||
watcher
|
||||
);
|
||||
}
|
||||
|
||||
to_check_futures.push(
|
||||
location_check_sleep(location_id, library)
|
||||
);
|
||||
}
|
||||
)
|
||||
); // ignore errors, we handle errors on receiver
|
||||
} else {
|
||||
warn!(
|
||||
"Location not found in database to be watched: {}",
|
||||
location_id
|
||||
);
|
||||
}
|
||||
},
|
||||
|
||||
// To remove an location
|
||||
ManagementMessageAction::Remove => {
|
||||
handle_remove_location_request(
|
||||
location_id,
|
||||
library,
|
||||
response_tx,
|
||||
&mut forced_unwatch,
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched,
|
||||
&mut to_remove,
|
||||
).await;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Watcher management messages
|
||||
Some(WatcherManagementMessage{
|
||||
location_id,
|
||||
library,
|
||||
action,
|
||||
response_tx,
|
||||
}) = watcher_management_rx.recv() => {
|
||||
match action {
|
||||
// To stop a watcher
|
||||
WatcherManagementMessageAction::Stop => {
|
||||
handle_stop_watcher_request(
|
||||
location_id,
|
||||
library,
|
||||
response_tx,
|
||||
&mut forced_unwatch,
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched,
|
||||
).await;
|
||||
},
|
||||
|
||||
// To reinit a stopped watcher
|
||||
WatcherManagementMessageAction::Reinit => {
|
||||
handle_reinit_watcher_request(
|
||||
location_id,
|
||||
library,
|
||||
response_tx,
|
||||
&mut forced_unwatch,
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched,
|
||||
).await;
|
||||
},
|
||||
|
||||
// To ignore or not events for a path
|
||||
WatcherManagementMessageAction::IgnoreEventsForPath { path, ignore } => {
|
||||
handle_ignore_path_request(
|
||||
location_id,
|
||||
library,
|
||||
path,
|
||||
ignore,
|
||||
response_tx,
|
||||
&locations_watched,
|
||||
);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Periodically checking locations
|
||||
Some((location_id, library)) = to_check_futures.next() => {
|
||||
let key = (location_id, library.id);
|
||||
|
||||
if to_remove.contains(&key) {
|
||||
// The time to check came for an already removed library, so we just ignore it
|
||||
to_remove.remove(&key);
|
||||
} else if let Some(location) = get_location(location_id, &library).await {
|
||||
if location.node_id == library.node_local_id {
|
||||
let is_online = match check_online(&location, &library).await {
|
||||
Ok(is_online) => is_online,
|
||||
Err(e) => {
|
||||
error!("Error while checking online status of location {location_id}: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if is_online
|
||||
&& !forced_unwatch.contains(&key)
|
||||
{
|
||||
watch_location(
|
||||
location,
|
||||
library.id,
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched,
|
||||
);
|
||||
} else {
|
||||
unwatch_location(
|
||||
location,
|
||||
library.id,
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched,
|
||||
);
|
||||
}
|
||||
to_check_futures.push(location_check_sleep(location_id, library));
|
||||
} else {
|
||||
drop_location(
|
||||
location_id,
|
||||
library.id,
|
||||
"Dropping location from location manager, because \
|
||||
it isn't a location in the current node",
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched
|
||||
);
|
||||
forced_unwatch.remove(&key);
|
||||
}
|
||||
} else {
|
||||
drop_location(
|
||||
location_id,
|
||||
library.id,
|
||||
"Removing location from manager, as we failed to fetch from db",
|
||||
&mut locations_watched,
|
||||
&mut locations_unwatched,
|
||||
);
|
||||
forced_unwatch.remove(&key);
|
||||
}
|
||||
}
|
||||
|
||||
_ = &mut stop_rx => {
|
||||
info!("Stopping location manager");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn is_online(&self, id: &Uuid) -> bool {
|
||||
let online_locations = self.online_locations.read().await;
|
||||
online_locations.iter().any(|v| v == id.as_bytes())
|
||||
}
|
||||
|
||||
pub async fn get_online(&self) -> OnlineLocations {
|
||||
self.online_locations.read().await.clone()
|
||||
}
|
||||
|
||||
async fn broadcast_online(&self) {
|
||||
self.online_tx.send(self.get_online().await).ok();
|
||||
}
|
||||
|
||||
pub async fn add_online(&self, id: Uuid) {
|
||||
self.online_locations
|
||||
.write()
|
||||
.await
|
||||
.insert(id.as_bytes().to_vec());
|
||||
self.broadcast_online().await;
|
||||
}
|
||||
|
||||
pub async fn remove_online(&self, id: &Uuid) {
|
||||
let mut online_locations = self.online_locations.write().await;
|
||||
online_locations.retain(|v| v != id.as_bytes());
|
||||
self.broadcast_online().await;
|
||||
}
|
||||
|
||||
pub fn online_rx(&self) -> Receiver<OnlineLocations> {
|
||||
self.online_tx.subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for LocationManager {
|
||||
fn drop(&mut self) {
|
||||
if let Some(stop_tx) = self.stop_tx.take() {
|
||||
if stop_tx.send(()).is_err() {
|
||||
error!("Failed to send stop signal to location manager");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use = "this `StopWatcherGuard` must be held for some time, so the watcher is stopped"]
|
||||
pub struct StopWatcherGuard<'m> {
|
||||
manager: &'m LocationManager,
|
||||
location_id: LocationId,
|
||||
library: Option<Library>,
|
||||
}
|
||||
|
||||
impl Drop for StopWatcherGuard<'_> {
|
||||
fn drop(&mut self) {
|
||||
if cfg!(feature = "location-watcher") {
|
||||
// FIXME: change this Drop to async drop in the future
|
||||
if let Err(e) = block_on(
|
||||
self.manager
|
||||
.reinit_watcher(self.location_id, self.library.take().unwrap()),
|
||||
) {
|
||||
error!("Failed to reinit watcher on stop watcher guard drop: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use = "this `IgnoreEventsForPathGuard` must be held for some time, so the watcher can ignore events for the desired path"]
|
||||
pub struct IgnoreEventsForPathGuard<'m> {
|
||||
manager: &'m LocationManager,
|
||||
path: Option<PathBuf>,
|
||||
location_id: LocationId,
|
||||
library: Option<Library>,
|
||||
}
|
||||
|
||||
impl Drop for IgnoreEventsForPathGuard<'_> {
|
||||
fn drop(&mut self) {
|
||||
if cfg!(feature = "location-watcher") {
|
||||
// FIXME: change this Drop to async drop in the future
|
||||
if let Err(e) = block_on(self.manager.watcher_management_message(
|
||||
self.location_id,
|
||||
self.library.take().unwrap(),
|
||||
WatcherManagementMessageAction::IgnoreEventsForPath {
|
||||
path: self.path.take().unwrap(),
|
||||
ignore: false,
|
||||
},
|
||||
)) {
|
||||
error!("Failed to un-ignore path on watcher guard drop: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -6,6 +6,7 @@ edition = "2021"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
async-channel = "1.8.0"
|
||||
async-trait = "0.1.68"
|
||||
futures-concurrency = "7.2.0"
|
||||
notify = { version = "5.1.0", features = ["macos_fsevent"] }
|
||||
|
|
67
crates/location-watcher/src/event.rs
Normal file
67
crates/location-watcher/src/event.rs
Normal file
|
@ -0,0 +1,67 @@
|
|||
use std::{fmt::Display, path::PathBuf};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use notify::Event as NotifyEvent;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
use super::{INodeAndDevice, LocationPubId, LocationWatcherError};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WatcherEvent {
|
||||
pub location_pub_id: LocationPubId,
|
||||
pub kind: EventKind,
|
||||
}
|
||||
|
||||
impl Display for WatcherEvent {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"WatcherEvent {{ location_pub_id: {}, kind: {} }}",
|
||||
self.location_pub_id, self.kind
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum EventKind {
|
||||
Create(PathBuf),
|
||||
Update(PathBuf),
|
||||
Rename { from: PathBuf, to: PathBuf },
|
||||
Delete(PathBuf),
|
||||
}
|
||||
|
||||
impl Display for EventKind {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
EventKind::Create(path) => write!(f, "Create({})", path.display()),
|
||||
EventKind::Update(path) => write!(f, "Update({})", path.display()),
|
||||
EventKind::Rename { from, to } => {
|
||||
write!(f, "Rename({} -> {})", from.display(), to.display())
|
||||
}
|
||||
EventKind::Delete(path) => write!(f, "Delete({})", path.display()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A trait to abstract away how each OS emits file system events
|
||||
#[async_trait]
|
||||
pub(super) trait EventHandler {
|
||||
fn new(
|
||||
location_pub_id: LocationPubId,
|
||||
inode_and_device_requester_tx: mpsc::Sender<(
|
||||
LocationPubId,
|
||||
PathBuf,
|
||||
oneshot::Sender<INodeAndDevice>,
|
||||
)>,
|
||||
events_to_emit_tx: mpsc::Sender<WatcherEvent>,
|
||||
) -> Self
|
||||
where
|
||||
Self: Sized;
|
||||
|
||||
/// Handle a file system event.
|
||||
async fn handle_event(&mut self, event: NotifyEvent) -> Result<(), LocationWatcherError>;
|
||||
|
||||
/// As Event Handlers have some inner state, from time to time we need to call this tick method
|
||||
/// so the event handler can update its state.
|
||||
async fn tick(&mut self) -> Result<(), Vec<LocationWatcherError>>;
|
||||
}
|
|
@ -1,83 +1,9 @@
|
|||
/******************************************************************************
|
||||
* Some annotations on how file system events are emitted on each OS *
|
||||
*******************************************************************************
|
||||
* Events dispatched on Linux: *
|
||||
* Create File: *
|
||||
* 1) EventKind::Create(CreateKind::File) *
|
||||
* 2) EventKind::Modify(ModifyKind::Metadata(MetadataKind::Any)) *
|
||||
* or EventKind::Modify(ModifyKind::Data(DataChange::Any)) *
|
||||
* 3) EventKind::Access(AccessKind::Close(AccessMode::Write))) *
|
||||
* Create Directory: *
|
||||
* 1) EventKind::Create(CreateKind::Folder) *
|
||||
* Update File: *
|
||||
* 1) EventKind::Modify(ModifyKind::Data(DataChange::Any)) *
|
||||
* 2) EventKind::Access(AccessKind::Close(AccessMode::Write))) *
|
||||
* Update File (rename): *
|
||||
* 1) EventKind::Modify(ModifyKind::Name(RenameMode::From)) *
|
||||
* 2) EventKind::Modify(ModifyKind::Name(RenameMode::To)) *
|
||||
* 3) EventKind::Modify(ModifyKind::Name(RenameMode::Both)) *
|
||||
* Update Directory (rename): *
|
||||
* 1) EventKind::Modify(ModifyKind::Name(RenameMode::From)) *
|
||||
* 2) EventKind::Modify(ModifyKind::Name(RenameMode::To)) *
|
||||
* 3) EventKind::Modify(ModifyKind::Name(RenameMode::Both)) *
|
||||
* Delete File: *
|
||||
* 1) EventKind::Remove(RemoveKind::File) *
|
||||
* Delete Directory: *
|
||||
* 1) EventKind::Remove(RemoveKind::Folder) *
|
||||
* *
|
||||
* Events dispatched on MacOS: *
|
||||
* Create File: *
|
||||
* 1) EventKind::Create(CreateKind::File) *
|
||||
* 2) EventKind::Modify(ModifyKind::Data(DataChange::Content)) *
|
||||
* Create Directory: *
|
||||
* 1) EventKind::Create(CreateKind::Folder) *
|
||||
* Update File: *
|
||||
* 1) EventKind::Modify(ModifyKind::Data(DataChange::Content)) *
|
||||
* Update File (rename): *
|
||||
* 1) EventKind::Modify(ModifyKind::Name(RenameMode::Any)) -- From *
|
||||
* 2) EventKind::Modify(ModifyKind::Name(RenameMode::Any)) -- To *
|
||||
* Update Directory (rename): *
|
||||
* 1) EventKind::Modify(ModifyKind::Name(RenameMode::Any)) -- From *
|
||||
* 2) EventKind::Modify(ModifyKind::Name(RenameMode::Any)) -- To *
|
||||
* Delete File: *
|
||||
* 1) EventKind::Remove(RemoveKind::File) *
|
||||
* Delete Directory: *
|
||||
* 1) EventKind::Remove(RemoveKind::Folder) *
|
||||
* *
|
||||
* Events dispatched on Windows: *
|
||||
* Create File: *
|
||||
* 1) EventKind::Create(CreateKind::Any) *
|
||||
* 2) EventKind::Modify(ModifyKind::Any) *
|
||||
* Create Directory: *
|
||||
* 1) EventKind::Create(CreateKind::Any) *
|
||||
* Update File: *
|
||||
* 1) EventKind::Modify(ModifyKind::Any) *
|
||||
* Update File (rename): *
|
||||
* 1) EventKind::Modify(ModifyKind::Name(RenameMode::From)) *
|
||||
* 2) EventKind::Modify(ModifyKind::Name(RenameMode::To)) *
|
||||
* Update Directory (rename): *
|
||||
* 1) EventKind::Modify(ModifyKind::Name(RenameMode::From)) *
|
||||
* 2) EventKind::Modify(ModifyKind::Name(RenameMode::To)) *
|
||||
* Delete File: *
|
||||
* 1) EventKind::Remove(RemoveKind::Any) *
|
||||
* Delete Directory: *
|
||||
* 1) EventKind::Remove(RemoveKind::Any) *
|
||||
* *
|
||||
* Events dispatched on Android: *
|
||||
* TODO *
|
||||
* *
|
||||
* Events dispatched on iOS: *
|
||||
* TODO *
|
||||
* *
|
||||
******************************************************************************/
|
||||
|
||||
use std::{
|
||||
fmt::Display,
|
||||
path::{Path, PathBuf},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures_concurrency::{future::Join, stream::Merge};
|
||||
use notify::{Config, Event as NotifyEvent, RecommendedWatcher, RecursiveMode, Watcher};
|
||||
use thiserror::Error;
|
||||
|
@ -85,15 +11,12 @@ use tokio::{
|
|||
sync::{mpsc, oneshot},
|
||||
time::{interval_at, Instant, MissedTickBehavior},
|
||||
};
|
||||
use tokio_stream::{
|
||||
self as stream,
|
||||
wrappers::{IntervalStream, UnboundedReceiverStream},
|
||||
StreamExt,
|
||||
};
|
||||
use tokio_stream::{self as stream, wrappers::IntervalStream, StreamExt};
|
||||
use tokio_util::sync::{CancellationToken, DropGuard};
|
||||
use tracing::{debug, error, trace, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
mod event;
|
||||
mod linux;
|
||||
mod macos;
|
||||
mod windows;
|
||||
|
@ -116,46 +39,17 @@ const HUNDRED_MILLIS: Duration = Duration::from_millis(100);
|
|||
|
||||
pub type LocationPubId = Uuid;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WatcherEvent {
|
||||
pub location_pub_id: LocationPubId,
|
||||
pub kind: EventKind,
|
||||
}
|
||||
pub use event::{EventKind, WatcherEvent};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum EventKind {
|
||||
Create(PathBuf),
|
||||
Rename { from: PathBuf, to: PathBuf },
|
||||
Delete(PathBuf),
|
||||
}
|
||||
|
||||
/// A trait to abstract away how each OS emits file system events
|
||||
#[async_trait]
|
||||
trait EventHandler {
|
||||
fn new(
|
||||
location_pub_id: LocationPubId,
|
||||
inode_and_device_requester_tx: mpsc::Sender<(
|
||||
LocationPubId,
|
||||
PathBuf,
|
||||
oneshot::Sender<INodeAndDevice>,
|
||||
)>,
|
||||
events_to_emit_tx: mpsc::Sender<WatcherEvent>,
|
||||
) -> Self
|
||||
where
|
||||
Self: Sized;
|
||||
|
||||
/// Handle a file system event.
|
||||
async fn handle_event(&mut self, event: NotifyEvent) -> Result<(), LocationWatcherError>;
|
||||
|
||||
/// As Event Handlers have some inner state, from time to time we need to call this tick method
|
||||
/// so the event handler can update its state.
|
||||
async fn tick(&mut self);
|
||||
}
|
||||
use event::EventHandler;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum LocationWatcherError {
|
||||
#[error("Notify Error: {0}")]
|
||||
Notify(#[from] notify::Error),
|
||||
|
||||
#[error("Failed to emit event back to Location Manager: {0}")]
|
||||
EmitEvent(WatcherEvent),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -196,13 +90,14 @@ impl LocationWatcher {
|
|||
check_location_online_tx: mpsc::Sender<(LocationPubId, oneshot::Sender<bool>)>,
|
||||
event_to_emit_tx: mpsc::Sender<WatcherEvent>,
|
||||
) -> Result<Self, LocationWatcherError> {
|
||||
let (fs_events_tx, fs_events_rx) = mpsc::unbounded_channel();
|
||||
let (fs_events_tx, fs_events_rx) = async_channel::unbounded();
|
||||
let cancel_token = CancellationToken::new();
|
||||
|
||||
let watcher = RecommendedWatcher::new(
|
||||
move |result| {
|
||||
if !fs_events_tx.is_closed() {
|
||||
if fs_events_tx.send(result).is_err() {
|
||||
// It's alright to use `send_blocking` here because we're using a unbounded channel
|
||||
if fs_events_tx.send_blocking(result).is_err() {
|
||||
error!(
|
||||
"Unable to send watcher event to location manager for location: <id='{}'>",
|
||||
location_pub_id
|
||||
|
@ -218,17 +113,41 @@ impl LocationWatcher {
|
|||
Config::default(),
|
||||
)?;
|
||||
|
||||
tokio::spawn(watch_events_loop(
|
||||
location_pub_id,
|
||||
InnerWatchingLoopChannels {
|
||||
check_paths_rejection_tx,
|
||||
inode_and_device_requester_tx,
|
||||
check_location_online_tx,
|
||||
fs_events_rx,
|
||||
event_to_emit_tx,
|
||||
},
|
||||
cancel_token.child_token(),
|
||||
));
|
||||
let inner_cancel_token = cancel_token.child_token();
|
||||
tokio::spawn(async move {
|
||||
let check_paths_rejection_tx = check_paths_rejection_tx;
|
||||
let inode_and_device_requester_tx = inode_and_device_requester_tx;
|
||||
let check_location_online_tx = check_location_online_tx;
|
||||
let fs_events_rx = fs_events_rx;
|
||||
let event_to_emit_tx = event_to_emit_tx;
|
||||
// FIXME: Change this to use scoped tasks to avoid clonning the Senders and just
|
||||
|
||||
// This outer loop guarantees that the inner loop will always be running, except in case of cancellation
|
||||
loop {
|
||||
if let Err(e) = tokio::spawn(watch_events_loop(
|
||||
location_pub_id,
|
||||
InnerWatchingLoopChannels {
|
||||
check_paths_rejection_tx: check_paths_rejection_tx.clone(),
|
||||
inode_and_device_requester_tx: inode_and_device_requester_tx.clone(),
|
||||
check_location_online_tx: check_location_online_tx.clone(),
|
||||
fs_events_rx: fs_events_rx.clone(),
|
||||
event_to_emit_tx: event_to_emit_tx.clone(),
|
||||
},
|
||||
inner_cancel_token.child_token(),
|
||||
))
|
||||
.await
|
||||
{
|
||||
error!(
|
||||
"Error while watching location: <pub_id='{location_pub_id}'>; \
|
||||
Error: {e}; \
|
||||
Restarting the watching loop...",
|
||||
);
|
||||
}
|
||||
if inner_cancel_token.is_cancelled() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
location_pub_id,
|
||||
|
@ -327,7 +246,7 @@ struct InnerWatchingLoopChannels {
|
|||
inode_and_device_requester_tx:
|
||||
mpsc::Sender<(LocationPubId, PathBuf, oneshot::Sender<INodeAndDevice>)>,
|
||||
check_location_online_tx: mpsc::Sender<(LocationPubId, oneshot::Sender<bool>)>,
|
||||
fs_events_rx: mpsc::UnboundedReceiver<notify::Result<NotifyEvent>>,
|
||||
fs_events_rx: async_channel::Receiver<notify::Result<NotifyEvent>>,
|
||||
event_to_emit_tx: mpsc::Sender<WatcherEvent>,
|
||||
}
|
||||
|
||||
|
@ -340,7 +259,7 @@ async fn watch_events_loop(
|
|||
fs_events_rx,
|
||||
event_to_emit_tx,
|
||||
}: InnerWatchingLoopChannels,
|
||||
cancel: CancellationToken,
|
||||
cancel_token: CancellationToken,
|
||||
) {
|
||||
let mut event_handler = Handler::new(
|
||||
location_pub_id,
|
||||
|
@ -359,9 +278,9 @@ async fn watch_events_loop(
|
|||
}
|
||||
|
||||
let mut stream = (
|
||||
UnboundedReceiverStream::new(fs_events_rx).map(StreamMessage::MaybeEvent),
|
||||
fs_events_rx.map(StreamMessage::MaybeEvent),
|
||||
IntervalStream::new(handler_ticker).map(|_| StreamMessage::Tick),
|
||||
stream::once(cancel.cancelled()).map(|_| StreamMessage::Stop),
|
||||
stream::once(cancel_token.cancelled()).map(|_| StreamMessage::Stop),
|
||||
)
|
||||
.merge();
|
||||
|
||||
|
@ -377,15 +296,26 @@ async fn watch_events_loop(
|
|||
)
|
||||
.await
|
||||
{
|
||||
error!("Failed to handle location file system event: <pub_id='{location_pub_id}'>; Error: {e}");
|
||||
error!(
|
||||
"Failed to handle location file system event: \
|
||||
<pub_id='{location_pub_id}'>; Error: {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
StreamMessage::MaybeEvent(Err(e)) => error!("Watcher error: {e}"),
|
||||
StreamMessage::Tick => event_handler.tick().await,
|
||||
StreamMessage::Tick => {
|
||||
if let Err(errors) = event_handler.tick().await {
|
||||
for e in errors {
|
||||
error!(
|
||||
"Failed to handle location file system event on `tick`: \
|
||||
<pub_id='{location_pub_id}'>; Error: {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
StreamMessage::Stop => {
|
||||
debug!(
|
||||
"Stop Location Manager event handler for location: <pub_id='{}'>",
|
||||
location_pub_id
|
||||
"Stopped Location Watcher event handler for location: <pub_id='{location_pub_id}'>",
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,225 @@
|
|||
/*!
|
||||
* Linux has the best behaving file system events, with just some small caveats:
|
||||
* When we move files or directories, we receive 3 events: Rename From, Rename To and Rename Both.
|
||||
* But when we move a file or directory to the outside from the watched location, we just receive
|
||||
* the Rename From event, so we have to keep track of all rename events to match them against each
|
||||
* other. If we have dangling Rename From events, we have to remove them after some time.
|
||||
* Aside from that, when a directory is moved to our watched location from the outside, we receive
|
||||
* a Create Dir event, this one is actually ok at least.
|
||||
*
|
||||
*
|
||||
* ## Events dispatched on Linux:
|
||||
* - Create File:
|
||||
* 1) `EventKind::Create(CreateKind::File)`
|
||||
* 2) `EventKind::Modify(ModifyKind::Metadata(MetadataKind::Any))`
|
||||
* or `EventKind::Modify(ModifyKind::Data(DataChange::Any))`
|
||||
* 3) `EventKind::Access(AccessKind::Close(AccessMode::Write)))`
|
||||
* - Create Directory:
|
||||
* 1) `EventKind::Create(CreateKind::Folder)`
|
||||
* - Update File:
|
||||
* 1) `EventKind::Modify(ModifyKind::Data(DataChange::Any))`
|
||||
* 2) `EventKind::Access(AccessKind::Close(AccessMode::Write)))`
|
||||
* - Update File (rename):
|
||||
* 1) `EventKind::Modify(ModifyKind::Name(RenameMode::From))`
|
||||
* 2) `EventKind::Modify(ModifyKind::Name(RenameMode::To))`
|
||||
* 3) `EventKind::Modify(ModifyKind::Name(RenameMode::Both))`
|
||||
* - Update Directory (rename):
|
||||
* 1) `EventKind::Modify(ModifyKind::Name(RenameMode::From))`
|
||||
* 2) `EventKind::Modify(ModifyKind::Name(RenameMode::To))`
|
||||
* 3) `EventKind::Modify(ModifyKind::Name(RenameMode::Both))`
|
||||
* - Delete File:
|
||||
* 1) `EventKind::Remove(RemoveKind::File)`
|
||||
* - Delete Directory:
|
||||
* 1) `EventKind::Remove(RemoveKind::Folder)`
|
||||
*/
|
||||
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
path::PathBuf,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use notify::{
|
||||
event::{AccessKind, AccessMode, CreateKind, ModifyKind, RenameMode},
|
||||
Event, EventKind as NotifyEventKind,
|
||||
};
|
||||
use tokio::{
|
||||
sync::{mpsc, oneshot},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::trace;
|
||||
|
||||
use super::{
|
||||
event::{EventHandler, EventKind, WatcherEvent},
|
||||
INodeAndDevice, LocationPubId, LocationWatcherError, HUNDRED_MILLIS,
|
||||
};
|
||||
|
||||
pub(super) struct LinuxEventHandler {
|
||||
location_pub_id: LocationPubId,
|
||||
events_to_emit_tx: mpsc::Sender<WatcherEvent>,
|
||||
last_check_rename: Instant,
|
||||
rename_from: HashMap<PathBuf, Instant>,
|
||||
rename_from_buffer: Vec<(PathBuf, Instant)>,
|
||||
recently_created_files: BTreeMap<PathBuf, Instant>,
|
||||
recently_renamed_from: BTreeMap<PathBuf, Instant>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventHandler for LinuxEventHandler {
|
||||
fn new(
|
||||
location_pub_id: LocationPubId,
|
||||
// Linux file system events are well behaved and doesn't need to check file's inode and device
|
||||
_inode_and_device_tx: mpsc::Sender<(
|
||||
LocationPubId,
|
||||
PathBuf,
|
||||
oneshot::Sender<INodeAndDevice>,
|
||||
)>,
|
||||
events_to_emit_tx: mpsc::Sender<WatcherEvent>,
|
||||
) -> Self
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
Self {
|
||||
location_pub_id,
|
||||
events_to_emit_tx,
|
||||
last_check_rename: Instant::now(),
|
||||
rename_from: HashMap::new(),
|
||||
rename_from_buffer: Vec::new(),
|
||||
recently_created_files: BTreeMap::new(),
|
||||
recently_renamed_from: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_event(&mut self, event: Event) -> Result<(), LocationWatcherError> {
|
||||
trace!("Received Linux event: {:#?}", event);
|
||||
|
||||
let Event {
|
||||
kind, mut paths, ..
|
||||
} = event;
|
||||
|
||||
match kind {
|
||||
NotifyEventKind::Create(CreateKind::File) => {
|
||||
let path = paths.remove(0);
|
||||
self.recently_created_files
|
||||
.insert(path.clone(), Instant::now());
|
||||
self.events_to_emit_tx
|
||||
.send(WatcherEvent {
|
||||
location_pub_id: self.location_pub_id,
|
||||
kind: EventKind::Create(path),
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
NotifyEventKind::Access(AccessKind::Close(AccessMode::Write)) => {
|
||||
// If a file was closed with write mode, then it was updated or created,
|
||||
// so we check if it was created recently
|
||||
let path = paths.remove(0);
|
||||
if !self.recently_created_files.contains_key(&path) {
|
||||
self.events_to_emit_tx
|
||||
.send(WatcherEvent {
|
||||
location_pub_id: self.location_pub_id,
|
||||
kind: EventKind::Update(path),
|
||||
})
|
||||
.await
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
NotifyEventKind::Create(CreateKind::Folder) => {
|
||||
self.events_to_emit_tx
|
||||
.send(WatcherEvent {
|
||||
location_pub_id: self.location_pub_id,
|
||||
kind: EventKind::Create(paths.remove(0)),
|
||||
})
|
||||
.await
|
||||
}
|
||||
NotifyEventKind::Modify(ModifyKind::Name(RenameMode::From)) => {
|
||||
// Just in case we can't garantee that we receive the Rename From event before the
|
||||
// Rename Both event. Just a safeguard
|
||||
if self.recently_renamed_from.remove(&paths[0]).is_none() {
|
||||
self.rename_from.insert(paths.remove(0), Instant::now());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
NotifyEventKind::Modify(ModifyKind::Name(RenameMode::Both)) => {
|
||||
let to = paths.remove(1);
|
||||
let from = paths.remove(0);
|
||||
self.rename_from.remove(&from);
|
||||
|
||||
self.recently_renamed_from
|
||||
.insert(from.clone(), Instant::now());
|
||||
self.events_to_emit_tx
|
||||
.send(WatcherEvent {
|
||||
location_pub_id: self.location_pub_id,
|
||||
kind: EventKind::Rename { from, to },
|
||||
})
|
||||
.await
|
||||
}
|
||||
NotifyEventKind::Remove(_) => {
|
||||
self.events_to_emit_tx
|
||||
.send(WatcherEvent {
|
||||
location_pub_id: self.location_pub_id,
|
||||
kind: EventKind::Delete(paths.remove(0)),
|
||||
})
|
||||
.await
|
||||
}
|
||||
other_event_kind => {
|
||||
trace!("Other Linux event that we don't handle for now: {other_event_kind:#?}");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
.map_err(|e| LocationWatcherError::EmitEvent(e.0))
|
||||
}
|
||||
|
||||
async fn tick(&mut self) -> Result<(), Vec<LocationWatcherError>> {
|
||||
if self.last_check_rename.elapsed() > HUNDRED_MILLIS {
|
||||
self.last_check_rename = Instant::now();
|
||||
|
||||
self.recently_renamed_from
|
||||
.retain(|_, instant| instant.elapsed() < HUNDRED_MILLIS);
|
||||
|
||||
self.recently_created_files
|
||||
.retain(|_, instant| instant.elapsed() < HUNDRED_MILLIS);
|
||||
|
||||
self.handle_rename_from_eviction().await
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LinuxEventHandler {
|
||||
async fn handle_rename_from_eviction(&mut self) -> Result<(), Vec<LocationWatcherError>> {
|
||||
self.rename_from_buffer.clear();
|
||||
|
||||
let mut errors = vec![];
|
||||
for (path, instant) in self.rename_from.drain() {
|
||||
if instant.elapsed() > HUNDRED_MILLIS {
|
||||
if let Err(e) = self
|
||||
.events_to_emit_tx
|
||||
.send(WatcherEvent {
|
||||
location_pub_id: self.location_pub_id,
|
||||
kind: EventKind::Delete(path),
|
||||
})
|
||||
.await
|
||||
{
|
||||
errors.push(LocationWatcherError::EmitEvent(e.0));
|
||||
}
|
||||
} else {
|
||||
self.rename_from_buffer.push((path, instant));
|
||||
}
|
||||
}
|
||||
|
||||
for (path, instant) in self.rename_from_buffer.drain(..) {
|
||||
self.rename_from.insert(path, instant);
|
||||
}
|
||||
|
||||
if errors.is_empty() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(errors)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,15 +1,36 @@
|
|||
//! On MacOS, we use the FSEvents backend of notify-rs and Rename events are pretty complicated;
|
||||
//! There are just (ModifyKind::Name(RenameMode::Any) events and nothing else.
|
||||
//! This means that we have to link the old path with the new path to know which file was renamed.
|
||||
//! But you can't forget that renames events aren't always the case that I file name was modified,
|
||||
//! but its path was modified. So we have to check if the file was moved. When a file is moved
|
||||
//! inside the same location, we received 2 events: one for the old path and one for the new path.
|
||||
//! But when a file is moved to another location, we only receive the old path event... This
|
||||
//! way we have to handle like a file deletion, and the same applies for when a file is moved to our
|
||||
//! current location from anywhere else, we just receive the new path rename event, which means a
|
||||
//! creation.
|
||||
/*!
|
||||
* On MacOS, we use the FSEvents backend of notify-rs and Rename events are pretty complicated;
|
||||
* There are just (ModifyKind::Name(RenameMode::Any) events and nothing else.
|
||||
* This means that we have to link the old path with the new path to know which file was renamed.
|
||||
* But you can't forget that renames events aren't always the case that I file name was modified,
|
||||
* but its path was modified. So we have to check if the file was moved. When a file is moved
|
||||
* inside the same location, we received 2 events: one for the old path and one for the new path.
|
||||
* But when a file is moved to another location, we only receive the old path event... This
|
||||
* way we have to handle like a file deletion, and the same applies for when a file is moved to our
|
||||
* current location from anywhere else, we just receive the new path rename event, which means a
|
||||
* creation.
|
||||
*
|
||||
* ## Events dispatched on MacOS:
|
||||
* - Create File:
|
||||
* 1) `EventKind::Create(CreateKind::File)`
|
||||
* 2) `EventKind::Modify(ModifyKind::Data(DataChange::Content))`
|
||||
* - Create Directory:
|
||||
* 1) `EventKind::Create(CreateKind::Folder)`
|
||||
* - Update File:
|
||||
* 1) `EventKind::Modify(ModifyKind::Data(DataChange::Content))`
|
||||
* - Update File (rename):
|
||||
* 1) `EventKind::Modify(ModifyKind::Name(RenameMode::Any))` --> From
|
||||
* 2) `EventKind::Modify(ModifyKind::Name(RenameMode::Any))` --> To
|
||||
* - Update Directory (rename):
|
||||
* 1) `EventKind::Modify(ModifyKind::Name(RenameMode::Any))` --> From
|
||||
* 2) `EventKind::Modify(ModifyKind::Name(RenameMode::Any))` --> To
|
||||
* - Delete File:
|
||||
* 1) `EventKind::Remove(RemoveKind::File)`
|
||||
* - Delete Directory:
|
||||
* 1) `EventKind::Remove(RemoveKind::Folder)`
|
||||
*/
|
||||
|
||||
use std::{
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
path::PathBuf,
|
||||
};
|
||||
|
@ -22,7 +43,8 @@ use tokio::{
|
|||
};
|
||||
|
||||
use super::{
|
||||
EventHandler, INodeAndDevice, InstantAndPath, LocationPubId, LocationWatcherError, WatcherEvent,
|
||||
event::{EventHandler, EventKind, WatcherEvent},
|
||||
INodeAndDevice, InstantAndPath, LocationPubId, LocationWatcherError,
|
||||
};
|
||||
|
||||
pub(super) struct MacOsEventHandler {
|
||||
|
@ -70,7 +92,7 @@ impl EventHandler for MacOsEventHandler {
|
|||
todo!()
|
||||
}
|
||||
|
||||
async fn tick(&mut self) {
|
||||
async fn tick(&mut self) -> Result<(), Vec<LocationWatcherError>> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
|
16
package.json
16
package.json
|
@ -31,24 +31,24 @@
|
|||
},
|
||||
"devDependencies": {
|
||||
"@babel/plugin-syntax-import-assertions": "^7.20.0",
|
||||
"@cspell/dict-rust": "^2.0.1",
|
||||
"@cspell/dict-typescript": "^2.0.2",
|
||||
"@cspell/dict-rust": "^4.0.1",
|
||||
"@cspell/dict-typescript": "^3.1.1",
|
||||
"@trivago/prettier-plugin-sort-imports": "^4.1.1",
|
||||
"cspell": "^6.12.0",
|
||||
"markdown-link-check": "^3.10.3",
|
||||
"prettier": "^2.8.7",
|
||||
"prettier-plugin-tailwindcss": "^0.2.6",
|
||||
"rimraf": "^4.3",
|
||||
"turbo": "^1.5.5",
|
||||
"turbo-ignore": "^0.3.0",
|
||||
"typescript": "^4.9.4"
|
||||
"prettier-plugin-tailwindcss": "^0.2.7",
|
||||
"rimraf": "^5.0.0",
|
||||
"turbo": "^1.9.1",
|
||||
"turbo-ignore": "^1.9.1",
|
||||
"typescript": "^5.0.4"
|
||||
},
|
||||
"overrides": {
|
||||
"vite-plugin-svgr": "https://github.com/spacedriveapp/vite-plugin-svgr#cb4195b69849429cdb18d1f12381676bf9196a84",
|
||||
"@types/node": "^18.0.0"
|
||||
},
|
||||
"engines": {
|
||||
"pnpm": ">=7.14.0",
|
||||
"pnpm": ">=8.0.0",
|
||||
"npm": "pnpm",
|
||||
"yarn": "pnpm",
|
||||
"node": ">=18.0.0"
|
||||
|
|
14248
pnpm-lock.yaml
14248
pnpm-lock.yaml
File diff suppressed because it is too large
Load diff
Loading…
Reference in a new issue