diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 000000000..3900ea085 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,80 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "lldb", + "request": "launch", + "name": "Tauri Development Debug", + "cargo": { + "args": [ + "build", + "--manifest-path=./apps/desktop/src-tauri/Cargo.toml", + "--no-default-features" + ], + "problemMatcher": "$rustc", + }, + "sourceLanguages": [ + "rust" + ], + "preLaunchTask": "ui:dev" + }, + { + "type": "lldb", + "request": "launch", + "name": "Tauri Production Debug", + "cargo": { + "args": [ + "build", + "--release", + "--manifest-path=./apps/desktop/src-tauri/Cargo.toml" + ], + "problemMatcher": "$rustc", + }, + "sourceLanguages": [ + "rust" + ], + "preLaunchTask": "ui:build" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'sd-core'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=sd-core" + ], + "filter": { + "name": "sd-core", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'sd-crypto'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=sd-crypto" + ], + "filter": { + "name": "sd-crypto", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + ] +} \ No newline at end of file diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 71c0c420f..a573f2387 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -4,20 +4,28 @@ { "type": "cargo", "command": "clippy", - "problemMatcher": ["$rustc"], + "problemMatcher": [ + "$rustc" + ], "group": { "kind": "build", "isDefault": true }, "label": "rust: cargo clippy", - "args": ["--all-targets", "--all-features", "--all"] + "args": [ + "--all-targets", + "--all-features", + "--all" + ] }, { "type": "npm", "script": "prep", "label": "pnpm: prep", "group": "none", - "problemMatcher": ["$rustc"] + "problemMatcher": [ + "$rustc" + ] }, { "type": "shell", @@ -32,7 +40,12 @@ }, "isBackground": true, "command": "pnpm", - "args": ["desktop", "vite", "--clearScreen=false", "--mode=development"], + "args": [ + "desktop", + "vite", + "--clearScreen=false", + "--mode=development" + ], "runOptions": { "instanceLimit": 1 } @@ -42,31 +55,49 @@ "label": "ui:build", "problemMatcher": "$tsc", "command": "pnpm", - "args": ["desktop", "vite", "build"] + "args": [ + "desktop", + "vite", + "build" + ] }, { "type": "cargo", "command": "run", - "args": ["--manifest-path=./apps/desktop/src-tauri/Cargo.toml", "--no-default-features"], + "args": [ + "--manifest-path=./apps/desktop/src-tauri/Cargo.toml", + "--no-default-features" + ], "env": { - "RUST_BACKTRACE": "short" // Change this if you want more or less backtrace + "RUST_BACKTRACE": "short" }, - "problemMatcher": ["$rustc"], + "problemMatcher": [ + "$rustc" + ], "group": "build", "label": "rust: run spacedrive", - "dependsOn": ["ui:dev"] + "dependsOn": [ + "ui:dev" + ] }, { "type": "cargo", "command": "run", - "args": ["--manifest-path=./apps/desktop/src-tauri/Cargo.toml", "--release"], + "args": [ + "--manifest-path=./apps/desktop/src-tauri/Cargo.toml", + "--release" + ], "env": { - "RUST_BACKTRACE": "short" // Change this if you want more or less backtrace + "RUST_BACKTRACE": "short" }, - "problemMatcher": ["$rustc"], + "problemMatcher": [ + "$rustc" + ], "group": "build", "label": "rust: run spacedrive release", - "dependsOn": ["ui:build"] + "dependsOn": [ + "ui:build" + ] } ] -} +} \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index fe810390a..b438add4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6626,7 +6626,7 @@ version = "0.1.0" dependencies = [ "async-stream", "async-trait", - "base64 0.13.1", + "base64 0.21.0", "blake3", "chrono", "ctor", @@ -6658,6 +6658,7 @@ dependencies = [ "serde_json", "serde_with 2.2.0", "specta", + "static_assertions", "sysinfo", "tempfile", "thiserror", @@ -6668,6 +6669,7 @@ dependencies = [ "uhlc", "uuid 1.2.1", "webp", + "winapi-util", ] [[package]] @@ -7673,9 +7675,9 @@ dependencies = [ [[package]] name = "sysinfo" -version = "0.26.4" +version = "0.28.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7890fff842b8db56f2033ebee8f6efe1921475c3830c115995552914fb967580" +checksum = "f69e0d827cce279e61c2f3399eb789271a8f136d8245edef70f06e3c9601a670" dependencies = [ "cfg-if", "core-foundation-sys", diff --git a/core/Cargo.toml b/core/Cargo.toml index 4c59e274e..3a10782f7 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -6,7 +6,7 @@ authors = ["Spacedrive Technology Inc."] license = "GNU GENERAL PUBLIC LICENSE" repository = "https://github.com/spacedriveapp/spacedrive" edition = "2021" -rust-version = "1.67.0" +rust-version = "1.68.1" [features] default = [] @@ -42,7 +42,7 @@ tokio = { workspace = true, features = [ "time", ] } -base64 = "0.13.0" +base64 = "0.21.0" serde = { version = "1.0", features = ["derive"] } chrono = { version = "0.4.22", features = ["serde"] } serde_json = "1.0" @@ -53,7 +53,7 @@ rmp-serde = "^1.1.1" blake3 = "1.3.1" hostname = "0.3.1" uuid = { version = "1.1.2", features = ["v4", "serde"] } -sysinfo = "0.26.4" +sysinfo = "0.28.3" thiserror = "1.0.37" include_dir = { version = "0.7.2", features = ["glob"] } async-trait = "^0.1.57" @@ -76,6 +76,10 @@ ffmpeg-next = { version = "5.1.1", optional = true, features = [] } notify = { version = "5.0.0", default-features = false, features = [ "macos_fsevent", ], optional = true } +static_assertions = "1.1.0" + +[target.'cfg(windows)'.dependencies.winapi-util] +version = "0.1.5" [dev-dependencies] tempfile = "^3.3.0" diff --git a/core/prisma/migrations/20230210031123_init/migration.sql b/core/prisma/migrations/20230210031123_init/migration.sql index 4e986f47d..72f9a7ed4 100644 --- a/core/prisma/migrations/20230210031123_init/migration.sql +++ b/core/prisma/migrations/20230210031123_init/migration.sql @@ -64,8 +64,8 @@ CREATE TABLE "location" ( "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, "pub_id" BLOB NOT NULL, "node_id" INTEGER NOT NULL, - "name" TEXT, - "local_path" TEXT, + "name" TEXT NOT NULL, + "path" TEXT NOT NULL, "total_capacity" INTEGER, "available_capacity" INTEGER, "is_archived" BOOLEAN NOT NULL DEFAULT false, @@ -80,10 +80,7 @@ CREATE TABLE "location" ( CREATE TABLE "object" ( "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, "pub_id" BLOB NOT NULL, - "name" TEXT, - "extension" TEXT COLLATE NOCASE, "kind" INTEGER NOT NULL DEFAULT 0, - "size_in_bytes" TEXT NOT NULL DEFAULT '0', "key_id" INTEGER, "hidden" BOOLEAN NOT NULL DEFAULT false, "favorite" BOOLEAN NOT NULL DEFAULT false, @@ -94,8 +91,6 @@ CREATE TABLE "object" ( "ipfs_id" TEXT, "note" TEXT, "date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - "date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - "date_indexed" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, CONSTRAINT "object_key_id_fkey" FOREIGN KEY ("key_id") REFERENCES "key" ("id") ON DELETE SET NULL ON UPDATE CASCADE ); @@ -109,6 +104,9 @@ CREATE TABLE "file_path" ( "materialized_path" TEXT NOT NULL, "name" TEXT NOT NULL, "extension" TEXT COLLATE NOCASE NOT NULL, + "size_in_bytes" TEXT NOT NULL DEFAULT '0', + "inode" BLOB NOT NULL, + "device" BLOB NOT NULL, "object_id" INTEGER, "parent_id" INTEGER, "key_id" INTEGER, @@ -320,6 +318,9 @@ CREATE INDEX "file_path_location_id_idx" ON "file_path"("location_id"); -- CreateIndex CREATE UNIQUE INDEX "file_path_location_id_materialized_path_name_extension_key" ON "file_path"("location_id", "materialized_path", "name", "extension"); +-- CreateIndex +CREATE UNIQUE INDEX "file_path_location_id_inode_device_key" ON "file_path"("location_id", "inode", "device"); + -- CreateIndex CREATE UNIQUE INDEX "file_conflict_original_object_id_key" ON "file_conflict"("original_object_id"); diff --git a/core/prisma/schema.prisma b/core/prisma/schema.prisma index 90e0698ca..ba05531ed 100644 --- a/core/prisma/schema.prisma +++ b/core/prisma/schema.prisma @@ -134,6 +134,11 @@ model FilePath { name String extension String + size_in_bytes String @default("0") + + inode Bytes // This is actually an unsigned 64 bit integer, but we don't have this type in SQLite + device Bytes // This is actually an unsigned 64 bit integer, but we don't have this type in SQLite + // the unique Object for this file path object_id Int? object Object? @relation(fields: [object_id], references: [id], onDelete: Restrict) @@ -155,6 +160,7 @@ model FilePath { @@id([location_id, id]) @@unique([location_id, materialized_path, name, extension]) + @@unique([location_id, inode, device]) @@index([location_id]) @@map("file_path") } @@ -163,12 +169,8 @@ model FilePath { model Object { id Int @id @default(autoincrement()) pub_id Bytes @unique - // basic metadata - name String? - // Must have 'COLLATE NOCASE' in migration - extension String? kind Int @default(0) - size_in_bytes String @default("0") + key_id Int? // handy ways to mark an object hidden Boolean @default(false) @@ -188,10 +190,6 @@ model Object { note String? // the original known creation date of this object date_created DateTime @default(now()) - // the last time this object was modified - date_modified DateTime @default(now()) - // when this object was first indexed - date_indexed DateTime @default(now()) tags TagOnObject[] labels LabelOnObject[] diff --git a/core/src/api/locations.rs b/core/src/api/locations.rs index 7e9fa676e..3795a54bd 100644 --- a/core/src/api/locations.rs +++ b/core/src/api/locations.rs @@ -8,7 +8,7 @@ use crate::{ prisma::{file_path, indexer_rule, indexer_rules_in_location, location, object, tag}, }; -use std::path::PathBuf; +use std::path::{PathBuf, MAIN_SEPARATOR, MAIN_SEPARATOR_STR}; use rspc::{self, ErrorCode, RouterBuilderLike, Type}; use serde::{Deserialize, Serialize}; @@ -87,8 +87,8 @@ pub(crate) fn mount() -> impl RouterBuilderLike { .await? .ok_or(LocationError::IdNotFound(args.location_id))?; - if !args.path.ends_with('/') { - args.path += "/"; + if !args.path.ends_with(MAIN_SEPARATOR) { + args.path += MAIN_SEPARATOR_STR; } let directory = db diff --git a/core/src/api/tags.rs b/core/src/api/tags.rs index 4e797af05..f6ff03cf9 100644 --- a/core/src/api/tags.rs +++ b/core/src/api/tags.rs @@ -51,20 +51,7 @@ pub(crate) fn mount() -> RouterBuilder { let mut items = Vec::with_capacity(objects.len()); - for mut object in objects { - // sorry brendan - // grab the first path and tac on the name - let oldest_path = &object.file_paths[0]; - object.name = Some(oldest_path.name.clone()); - object.extension = if oldest_path.extension.is_empty() { - None - } else { - Some(oldest_path.extension.clone()) - }; - // a long term fix for this would be to have the indexer give the Object - // a name and extension, sacrificing its own and only store newly found Path - // names that differ from the Object name - + for object in objects { let cas_id = object .file_paths .iter() diff --git a/core/src/custom_uri.rs b/core/src/custom_uri.rs index 2cd46df65..637a56370 100644 --- a/core/src/custom_uri.rs +++ b/core/src/custom_uri.rs @@ -1,4 +1,4 @@ -use crate::{prisma::file_path, Node}; +use crate::{location::file_path_helper::MaterializedPath, prisma::file_path, Node}; use std::{ cmp::min, @@ -125,7 +125,10 @@ async fn handle_file( .ok_or_else(|| HandleCustomUriError::NotFound("object"))?; let lru_entry = ( - Path::new(&file_path.location.path).join(&file_path.materialized_path), + Path::new(&file_path.location.path).join(&MaterializedPath::from(( + location_id, + &file_path.materialized_path, + ))), file_path.extension, ); FILE_METADATA_CACHE.insert(lru_cache_key, lru_entry.clone()); diff --git a/core/src/location/error.rs b/core/src/location/error.rs index c72a62bb6..7c214b14c 100644 --- a/core/src/location/error.rs +++ b/core/src/location/error.rs @@ -1,5 +1,3 @@ -use crate::LocationManagerError; - use std::path::PathBuf; use rspc::{self, ErrorCode}; @@ -7,7 +5,9 @@ use thiserror::Error; use tokio::io; use uuid::Uuid; -use super::{file_path_helper::FilePathError, metadata::LocationMetadataError}; +use super::{ + file_path_helper::FilePathError, manager::LocationManagerError, metadata::LocationMetadataError, +}; /// Error type for location related errors #[derive(Error, Debug)] diff --git a/core/src/location/file_path_helper.rs b/core/src/location/file_path_helper.rs index 43d7274e5..1fd357930 100644 --- a/core/src/location/file_path_helper.rs +++ b/core/src/location/file_path_helper.rs @@ -1,11 +1,10 @@ -use crate::prisma::{ - file_path::{self, FindMany}, - location, PrismaClient, -}; +use crate::prisma::{file_path, location, PrismaClient}; use std::{ + borrow::Cow, fmt::{Display, Formatter}, - path::{Path, PathBuf}, + fs::Metadata, + path::{Path, PathBuf, MAIN_SEPARATOR, MAIN_SEPARATOR_STR}, }; use dashmap::{mapref::entry::Entry, DashMap}; @@ -47,15 +46,15 @@ file_path::select!(file_path_just_materialized_path_cas_id { file_path::include!(file_path_with_object { object }); #[derive(Serialize, Deserialize, Clone, Debug)] -pub struct MaterializedPath { - pub(super) materialized_path: String, +pub struct MaterializedPath<'a> { + pub(super) materialized_path: Cow<'a, str>, pub(super) is_dir: bool, pub(super) location_id: LocationId, - pub(super) name: String, - pub(super) extension: String, + pub(super) name: Cow<'a, str>, + pub(super) extension: Cow<'a, str>, } -impl MaterializedPath { +impl MaterializedPath<'static> { pub fn new( location_id: LocationId, location_path: impl AsRef, @@ -63,14 +62,15 @@ impl MaterializedPath { is_dir: bool, ) -> Result { let full_path = full_path.as_ref(); - let mut materialized_path = + let mut materialized_path = format!( + "{MAIN_SEPARATOR_STR}{}", extract_materialized_path(location_id, location_path, full_path)? .to_str() .expect("Found non-UTF-8 path") - .to_string(); + ); - if is_dir && !materialized_path.ends_with('/') { - materialized_path += "/"; + if is_dir && !materialized_path.ends_with(MAIN_SEPARATOR) { + materialized_path += MAIN_SEPARATOR_STR; } let extension = if !is_dir { @@ -96,78 +96,117 @@ impl MaterializedPath { }; Ok(Self { - materialized_path, + materialized_path: Cow::Owned(materialized_path), is_dir, location_id, - name: Self::prepare_name(full_path), - extension, + name: Cow::Owned(Self::prepare_name(full_path).to_string()), + extension: Cow::Owned(extension), }) } +} +impl<'a> MaterializedPath<'a> { pub fn location_id(&self) -> LocationId { self.location_id } - fn prepare_name(path: &Path) -> String { + fn prepare_name(path: &Path) -> &str { // Not using `impl AsRef` here because it's an private method - path.file_name() + path.file_stem() .unwrap_or_default() .to_str() .unwrap_or_default() - .to_string() } pub fn parent(&self) -> Self { - let parent_path = Path::new(&self.materialized_path) + let parent_path = Path::new(self.materialized_path.as_ref()) .parent() - .unwrap_or_else(|| Path::new("/")); + .unwrap_or_else(|| Path::new(MAIN_SEPARATOR_STR)); let mut parent_path_str = parent_path .to_str() .unwrap() // SAFETY: This unwrap is ok because this path was a valid UTF-8 String before .to_string(); - if !parent_path_str.ends_with('/') { - parent_path_str += "/"; + if !parent_path_str.ends_with(MAIN_SEPARATOR) { + parent_path_str += MAIN_SEPARATOR_STR; } Self { - materialized_path: parent_path_str, + materialized_path: Cow::Owned(parent_path_str), is_dir: true, location_id: self.location_id, // NOTE: This way we don't use the same name for "/" `file_path`, that uses the location // name in the database, check later if this is a problem - name: Self::prepare_name(parent_path), - extension: String::new(), + name: Cow::Owned(Self::prepare_name(parent_path).to_string()), + extension: Cow::Owned(String::new()), } } } -impl From for String { +impl<'a, S: AsRef + 'a> From<(LocationId, &'a S)> for MaterializedPath<'a> { + fn from((location_id, materialized_path): (LocationId, &'a S)) -> Self { + let materialized_path = materialized_path.as_ref(); + let is_dir = materialized_path.ends_with(MAIN_SEPARATOR); + let length = materialized_path.len(); + + let (name, extension) = if length == 1 { + // The case for the root path + (materialized_path, "") + } else if is_dir { + let first_name_char = materialized_path[..(length - 1)] + .rfind(MAIN_SEPARATOR) + .unwrap_or(0) + 1; + (&materialized_path[first_name_char..(length - 1)], "") + } else { + let first_name_char = materialized_path.rfind(MAIN_SEPARATOR).unwrap_or(0) + 1; + if let Some(last_dot_relative_idx) = materialized_path[first_name_char..].rfind('.') { + let last_dot_idx = first_name_char + last_dot_relative_idx; + ( + &materialized_path[first_name_char..last_dot_idx], + &materialized_path[last_dot_idx + 1..], + ) + } else { + (&materialized_path[first_name_char..], "") + } + }; + + Self { + materialized_path: Cow::Borrowed(materialized_path), + location_id, + is_dir, + name: Cow::Borrowed(name), + extension: Cow::Borrowed(extension), + } + } +} + +impl From> for String { fn from(path: MaterializedPath) -> Self { - path.materialized_path + path.materialized_path.into_owned() } } -impl From<&MaterializedPath> for String { +impl From<&MaterializedPath<'_>> for String { fn from(path: &MaterializedPath) -> Self { - path.materialized_path.clone() + path.materialized_path.to_string() } } -impl AsRef for MaterializedPath { +impl AsRef for MaterializedPath<'_> { fn as_ref(&self) -> &str { self.materialized_path.as_ref() } } -impl AsRef for MaterializedPath { +impl AsRef for &MaterializedPath<'_> { fn as_ref(&self) -> &Path { - Path::new(&self.materialized_path) + // Skipping / because it's not a valid path to be joined + Path::new(&self.materialized_path[1..]) } } -impl Display for MaterializedPath { +impl Display for MaterializedPath<'_> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.materialized_path) } @@ -175,6 +214,8 @@ impl Display for MaterializedPath { #[derive(Error, Debug)] pub enum FilePathError { + #[error("File Path not found: ")] + NotFound(PathBuf), #[error("Received an invalid sub path: ")] InvalidSubPath { location_path: PathBuf, @@ -213,27 +254,40 @@ impl LastFilePathIdManager { Default::default() } - pub async fn get_max_file_path_id( + pub async fn sync( &self, location_id: LocationId, db: &PrismaClient, + ) -> Result<(), FilePathError> { + if let Some(mut id_ref) = self.last_id_by_location.get_mut(&location_id) { + *id_ref = Self::fetch_max_file_path_id(location_id, db).await?; + } + + Ok(()) + } + + pub async fn increment( + &self, + location_id: LocationId, + by: i32, + db: &PrismaClient, ) -> Result { Ok(match self.last_id_by_location.entry(location_id) { - Entry::Occupied(entry) => *entry.get(), + Entry::Occupied(mut entry) => { + let first_free_id = *entry.get() + 1; + *entry.get_mut() += by + 1; + first_free_id + } Entry::Vacant(entry) => { // I wish I could use `or_try_insert_with` method instead of this crappy match, // but we don't have async closures yet ): - let id = Self::fetch_max_file_path_id(location_id, db).await?; - entry.insert(id); - id + let first_free_id = Self::fetch_max_file_path_id(location_id, db).await? + 1; + entry.insert(first_free_id + by); + first_free_id } }) } - pub async fn set_max_file_path_id(&self, location_id: LocationId, id: i32) { - self.last_id_by_location.insert(location_id, id); - } - async fn fetch_max_file_path_id( location_id: LocationId, db: &PrismaClient, @@ -259,8 +313,10 @@ impl LastFilePathIdManager { location_id, name, extension, - }: MaterializedPath, + }: MaterializedPath<'_>, parent_id: Option, + inode: u64, + device: u64, ) -> Result { // Keeping a reference in that map for the entire duration of the function, so we keep it locked let mut last_id_ref = match self.last_id_by_location.entry(location_id) { @@ -278,9 +334,11 @@ impl LastFilePathIdManager { .create( next_id, location::id::equals(location_id), - materialized_path, - name, - extension, + materialized_path.into_owned(), + name.into_owned(), + extension.into_owned(), + inode.to_le_bytes().into(), + device.to_le_bytes().into(), vec![ file_path::parent_id::set(parent_id), file_path::is_dir::set(is_dir), @@ -324,11 +382,10 @@ pub fn extract_materialized_path( }) } -pub async fn find_many_file_paths_by_full_path<'db>( +pub async fn filter_file_paths_by_many_full_path_params( location: &location::Data, full_paths: &[impl AsRef], - db: &'db PrismaClient, -) -> Result, FilePathError> { +) -> Result, FilePathError> { let is_dirs = try_join_all( full_paths .iter() @@ -345,101 +402,130 @@ pub async fn find_many_file_paths_by_full_path<'db>( // Collecting in a Result, so we stop on the first error .collect::, _>>()?; - Ok(db.file_path().find_many(vec![ + Ok(vec![ file_path::location_id::equals(location.id), file_path::materialized_path::in_vec(materialized_paths), - ])) + ]) +} + +#[cfg(feature = "location-watcher")] +pub async fn check_existing_file_path( + materialized_path: &MaterializedPath<'_>, + db: &PrismaClient, +) -> Result { + db.file_path() + .count(filter_existing_file_path_params(materialized_path)) + .exec() + .await + .map_or_else(|e| Err(e.into()), |count| Ok(count > 0)) +} + +pub fn filter_existing_file_path_params( + MaterializedPath { + materialized_path, + is_dir, + location_id, + name, + extension, + }: &MaterializedPath, +) -> Vec { + let mut params = vec![ + file_path::location_id::equals(*location_id), + file_path::materialized_path::equals(materialized_path.to_string()), + file_path::is_dir::equals(*is_dir), + file_path::extension::equals(extension.to_string()), + ]; + + // This is due to a limitation of MaterializedPath, where we don't know the location name to use + // as the file_path name at the root of the location "/" or "\" on Windows + if materialized_path != MAIN_SEPARATOR_STR { + params.push(file_path::name::equals(name.to_string())); + } + + params +} + +/// With this function we try to do a loose filtering of file paths, to avoid having to do check +/// twice for directories and for files. This is because directories have a trailing `/` or `\` in +/// the materialized path +pub fn loose_find_existing_file_path_params( + MaterializedPath { + materialized_path, + is_dir, + location_id, + name, + .. + }: &MaterializedPath, +) -> Vec { + let mut materialized_path_str = materialized_path.to_string(); + if *is_dir { + materialized_path_str.pop(); + } + + let mut params = vec![ + file_path::location_id::equals(*location_id), + file_path::materialized_path::starts_with(materialized_path_str), + ]; + + // This is due to a limitation of MaterializedPath, where we don't know the location name to use + // as the file_path name at the root of the location "/" or "\" on Windows + if materialized_path != MAIN_SEPARATOR_STR { + params.push(file_path::name::equals(name.to_string())); + } + + params } pub async fn get_existing_file_path_id( - materialized_path: MaterializedPath, + materialized_path: &MaterializedPath<'_>, db: &PrismaClient, ) -> Result, FilePathError> { db.file_path() - .find_first(vec![ - file_path::location_id::equals(materialized_path.location_id), - file_path::materialized_path::equals(materialized_path.into()), - ]) + .find_first(filter_existing_file_path_params(materialized_path)) .select(file_path::select!({ id })) .exec() .await .map_or_else(|e| Err(e.into()), |r| Ok(r.map(|r| r.id))) } -#[cfg(feature = "location-watcher")] -pub async fn get_existing_file_path( - materialized_path: MaterializedPath, - db: &PrismaClient, -) -> Result, FilePathError> { - db.file_path() - .find_first(vec![ - file_path::location_id::equals(materialized_path.location_id), - file_path::materialized_path::equals(materialized_path.into()), - ]) - .exec() - .await - .map_err(Into::into) -} - -#[cfg(feature = "location-watcher")] -pub async fn get_existing_file_path_with_object( - materialized_path: MaterializedPath, - db: &PrismaClient, -) -> Result, FilePathError> { - db.file_path() - .find_first(vec![ - file_path::location_id::equals(materialized_path.location_id), - file_path::materialized_path::equals(materialized_path.into()), - ]) - // include object for orphan check - .include(file_path_with_object::include()) - .exec() - .await - .map_err(Into::into) -} - -#[cfg(feature = "location-watcher")] -pub async fn get_existing_file_or_directory( - location: &super::location_with_indexer_rules::Data, - path: impl AsRef, - db: &PrismaClient, -) -> Result, FilePathError> { - let mut maybe_file_path = get_existing_file_path_with_object( - MaterializedPath::new(location.id, &location.path, path.as_ref(), false)?, - db, - ) - .await?; - // First we just check if this path was a file in our db, if it isn't then we check for a directory - if maybe_file_path.is_none() { - maybe_file_path = get_existing_file_path_with_object( - MaterializedPath::new(location.id, &location.path, path.as_ref(), true)?, - db, - ) - .await?; - } - - Ok(maybe_file_path) -} - #[cfg(feature = "location-watcher")] pub async fn get_parent_dir( - materialized_path: &MaterializedPath, + materialized_path: &MaterializedPath<'_>, db: &PrismaClient, ) -> Result, FilePathError> { - get_existing_file_path(materialized_path.parent(), db).await + db.file_path() + .find_first(filter_existing_file_path_params( + &materialized_path.parent(), + )) + .exec() + .await + .map_err(Into::into) +} + +#[cfg(feature = "location-watcher")] +pub async fn get_parent_dir_id( + materialized_path: &MaterializedPath<'_>, + db: &PrismaClient, +) -> Result, FilePathError> { + get_existing_file_path_id(&materialized_path.parent(), db).await } pub async fn ensure_sub_path_is_in_location( location_path: impl AsRef, sub_path: impl AsRef, ) -> Result { - let sub_path = sub_path.as_ref(); + let mut sub_path = sub_path.as_ref(); + if sub_path.starts_with(MAIN_SEPARATOR_STR) { + // SAFETY: we just checked that it starts with the separator + sub_path = sub_path.strip_prefix(MAIN_SEPARATOR_STR).unwrap(); + } let location_path = location_path.as_ref(); if !sub_path.starts_with(location_path) { // If the sub_path doesn't start with the location_path, we have to check if it's a // materialized path received from the frontend, then we check if the full path exists let full_path = location_path.join(sub_path); + match fs::metadata(&full_path).await { Ok(_) => Ok(full_path), Err(e) if e.kind() == io::ErrorKind::NotFound => Err(FilePathError::InvalidSubPath { @@ -457,8 +543,7 @@ pub async fn ensure_sub_path_is_directory( location_path: impl AsRef, sub_path: impl AsRef, ) -> Result<(), FilePathError> { - let sub_path = sub_path.as_ref(); - let location_path = location_path.as_ref(); + let mut sub_path = sub_path.as_ref(); match fs::metadata(sub_path).await { Ok(meta) => { @@ -469,6 +554,13 @@ pub async fn ensure_sub_path_is_directory( } } Err(e) if e.kind() == io::ErrorKind::NotFound => { + if sub_path.starts_with(MAIN_SEPARATOR_STR) { + // SAFETY: we just checked that it starts with the separator + sub_path = sub_path.strip_prefix(MAIN_SEPARATOR_STR).unwrap(); + } + + let location_path = location_path.as_ref(); + match fs::metadata(location_path.join(sub_path)).await { Ok(meta) => { if meta.is_file() { @@ -489,3 +581,83 @@ pub async fn ensure_sub_path_is_directory( Err(e) => Err(e.into()), } } + +pub async fn retain_file_paths_in_location( + location_id: LocationId, + to_retain: Vec, + maybe_parent_file_path: Option, + db: &PrismaClient, +) -> Result { + let mut to_delete_params = vec![ + file_path::location_id::equals(location_id), + file_path::id::not_in_vec(to_retain), + ]; + + if let Some(parent_file_path) = maybe_parent_file_path { + // If the parent_materialized_path is not the root path, we only delete file paths that start with the parent path + if parent_file_path.materialized_path != MAIN_SEPARATOR_STR { + to_delete_params.push(file_path::materialized_path::starts_with( + parent_file_path.materialized_path, + )); + } else { + // If the parent_materialized_path is the root path, we fetch children using the parent id + to_delete_params.push(file_path::parent_id::equals(Some(parent_file_path.id))); + } + } + + db.file_path() + .delete_many(to_delete_params) + .exec() + .await + .map_err(Into::into) +} + +#[allow(unused)] // TODO remove this annotation when we can use it on windows +pub fn get_inode_and_device(metadata: &Metadata) -> Result<(u64, u64), FilePathError> { + #[cfg(target_family = "unix")] + { + use std::os::unix::fs::MetadataExt; + + Ok((metadata.ino(), metadata.dev())) + } + + #[cfg(target_family = "windows")] + { + // TODO use this when it's stable and remove winapi-utils dependency + + // use std::os::windows::fs::MetadataExt; + + // Ok(( + // metadata + // .file_index() + // .expect("This function must not be called from a `DirEntry`'s `Metadata"), + // metadata + // .volume_serial_number() + // .expect("This function must not be called from a `DirEntry`'s `Metadata") as u64, + // )) + + todo!("Use metadata: {:#?}", metadata) + } +} + +#[allow(unused)] +pub async fn get_inode_and_device_from_path( + path: impl AsRef, +) -> Result<(u64, u64), FilePathError> { + #[cfg(target_family = "unix")] + { + // TODO use this when it's stable and remove winapi-utils dependency + let metadata = fs::metadata(path.as_ref()).await?; + + get_inode_and_device(&metadata) + } + + #[cfg(target_family = "windows")] + { + use winapi_util::{file::information, Handle}; + + let info = information(&Handle::from_path_any(path.as_ref())?)?; + + Ok((info.file_index(), info.volume_serial_number())) + } +} diff --git a/core/src/location/indexer/indexer_job.rs b/core/src/location/indexer/indexer_job.rs index c86e48eaf..df55847d4 100644 --- a/core/src/location/indexer/indexer_job.rs +++ b/core/src/location/indexer/indexer_job.rs @@ -3,8 +3,9 @@ use crate::{ library::Library, location::file_path_helper::{ ensure_sub_path_is_directory, ensure_sub_path_is_in_location, - file_path_just_id_materialized_path, find_many_file_paths_by_full_path, - get_existing_file_path_id, MaterializedPath, + file_path_just_id_materialized_path, filter_existing_file_path_params, + filter_file_paths_by_many_full_path_params, retain_file_paths_in_location, + MaterializedPath, }, prisma::location, }; @@ -54,13 +55,6 @@ impl StatefulJob for IndexerJob { let location_id = state.init.location.id; let location_path = Path::new(&state.init.location.path); - // grab the next id so we can increment in memory for batch inserting - let first_file_id = last_file_path_id_manager - .get_max_file_path_id(location_id, db) - .await - .map_err(IndexerError::from)? - + 1; - let mut indexer_rules_by_kind: HashMap> = HashMap::with_capacity(state.init.location.indexer_rules.len()); for location_rule in &state.init.location.indexer_rules { @@ -74,7 +68,8 @@ impl StatefulJob for IndexerJob { let mut dirs_ids = HashMap::new(); - let to_walk_path = if let Some(ref sub_path) = state.init.sub_path { + let (to_walk_path, maybe_parent_file_path) = if let Some(ref sub_path) = state.init.sub_path + { let full_path = ensure_sub_path_is_in_location(location_path, sub_path) .await .map_err(IndexerError::from)?; @@ -82,21 +77,24 @@ impl StatefulJob for IndexerJob { .await .map_err(IndexerError::from)?; - let sub_path_file_path_id = get_existing_file_path_id( - MaterializedPath::new(location_id, location_path, &full_path, true) - .map_err(IndexerError::from)?, - db, - ) - .await - .map_err(IndexerError::from)? - .expect("Sub path should already exist in the database"); + let sub_path_file_path = db + .file_path() + .find_first(filter_existing_file_path_params( + &MaterializedPath::new(location_id, location_path, &full_path, true) + .map_err(IndexerError::from)?, + )) + .select(file_path_just_id_materialized_path::select()) + .exec() + .await + .map_err(IndexerError::from)? + .expect("Sub path should already exist in the database"); // If we're operating with a sub_path, then we have to put its id on `dirs_ids` map - dirs_ids.insert(full_path.clone(), sub_path_file_path_id); + dirs_ids.insert(full_path.clone(), sub_path_file_path.id); - full_path + (full_path, Some(sub_path_file_path)) } else { - location_path.to_path_buf() + (location_path.to_path_buf(), None) }; let scan_start = Instant::now(); @@ -118,29 +116,53 @@ impl StatefulJob for IndexerJob { ) .await?; + // NOTE: + // As we're passing the list of currently existing file paths to the `find_many_file_paths_by_full_path` query, + // it means that `dirs_ids` contains just paths that still exists on the filesystem. dirs_ids.extend( - find_many_file_paths_by_full_path( - &location::Data::from(&state.init.location), - &found_paths - .iter() - .map(|entry| &entry.path) - .collect::>(), - db, - ) - .await - .map_err(IndexerError::from)? - .select(file_path_just_id_materialized_path::select()) - .exec() - .await? - .into_iter() - .map(|file_path| { - ( - location_path.join(file_path.materialized_path), - file_path.id, + db.file_path() + .find_many( + filter_file_paths_by_many_full_path_params( + &location::Data::from(&state.init.location), + &found_paths + .iter() + .map(|entry| &entry.path) + .collect::>(), + ) + .await + .map_err(IndexerError::from)?, ) - }), + .select(file_path_just_id_materialized_path::select()) + .exec() + .await? + .into_iter() + .map(|file_path| { + ( + location_path.join(&MaterializedPath::from(( + location_id, + &file_path.materialized_path, + ))), + file_path.id, + ) + }), ); + // Removing all other file paths that are not in the filesystem anymore + let removed_paths = retain_file_paths_in_location( + location_id, + dirs_ids.values().copied().collect(), + maybe_parent_file_path, + db, + ) + .await + .map_err(IndexerError::from)?; + + // Syncing the last file path id manager, as we potentially just removed a bunch of ids + last_file_path_id_manager + .sync(location_id, db) + .await + .map_err(IndexerError::from)?; + let mut new_paths = found_paths .into_iter() .filter_map(|entry| { @@ -169,6 +191,8 @@ impl StatefulJob for IndexerJob { dirs_ids.get(parent_dir).copied() }), full_path: entry.path, + inode: entry.inode, + device: entry.device, } }) }, @@ -177,16 +201,16 @@ impl StatefulJob for IndexerJob { .collect::>(); let total_paths = new_paths.len(); - let last_file_id = first_file_id + total_paths as i32; - // Setting our global state for `file_path` ids - last_file_path_id_manager - .set_max_file_path_id(location_id, last_file_id) - .await; + // grab the next id so we can increment in memory for batch inserting + let first_file_id = last_file_path_id_manager + .increment(location_id, total_paths as i32, db) + .await + .map_err(IndexerError::from)?; new_paths .iter_mut() - .zip(first_file_id..last_file_id) + .zip(first_file_id..) .for_each(|(entry, file_id)| { // If the `parent_id` is still none here, is because the parent of this entry is also // a new one in the DB @@ -205,6 +229,7 @@ impl StatefulJob for IndexerJob { scan_read_time: scan_start.elapsed(), total_paths, indexed_paths: 0, + removed_paths, }); state.steps = new_paths diff --git a/core/src/location/indexer/mod.rs b/core/src/location/indexer/mod.rs index 2acdebdea..e7d4d68a9 100644 --- a/core/src/location/indexer/mod.rs +++ b/core/src/location/indexer/mod.rs @@ -59,6 +59,7 @@ pub struct IndexerJobData { scan_read_time: Duration, total_paths: usize, indexed_paths: i64, + removed_paths: i64, } /// `IndexerJobStep` is a type alias, specifying that each step of the [`IndexerJob`] is a vector of @@ -70,10 +71,12 @@ pub type IndexerJobStep = Vec; #[derive(Serialize, Deserialize)] pub struct IndexerJobStepEntry { full_path: PathBuf, - materialized_path: MaterializedPath, + materialized_path: MaterializedPath<'static>, created_at: DateTime, file_id: i32, parent_id: Option, + inode: u64, + device: u64, } impl IndexerJobData { @@ -175,6 +178,8 @@ async fn execute_indexer_step( ("name", json!(name.clone())), ("is_dir", json!(is_dir)), ("extension", json!(extension.clone())), + ("inode", json!(entry.inode)), + ("device", json!(entry.device)), ("parent_id", json!(entry.parent_id)), ("date_created", json!(entry.created_at)), ], @@ -182,9 +187,11 @@ async fn execute_indexer_step( file_path::create_unchecked( entry.file_id, location.id, - materialized_path, - name, - extension, + materialized_path.into_owned(), + name.into_owned(), + extension.into_owned(), + entry.inode.to_le_bytes().into(), + entry.device.to_le_bytes().into(), vec![ is_dir::set(is_dir), parent_id::set(entry.parent_id), @@ -224,7 +231,7 @@ where .as_ref() .expect("critical error: missing data on job state"); - tracing::info!( + info!( "scan of {} completed in {:?}. {} new files found, \ indexed {} files in db. db write completed in {:?}", location_path.as_ref().display(), @@ -236,7 +243,7 @@ where .expect("critical error: non-negative duration"), ); - if data.indexed_paths > 0 { + if data.indexed_paths > 0 || data.removed_paths > 0 { invalidate_query!(ctx.library, "locations.getExplorerData"); } diff --git a/core/src/location/indexer/shallow_indexer_job.rs b/core/src/location/indexer/shallow_indexer_job.rs index 0f3cf1161..d619adee7 100644 --- a/core/src/location/indexer/shallow_indexer_job.rs +++ b/core/src/location/indexer/shallow_indexer_job.rs @@ -3,8 +3,9 @@ use crate::{ library::Library, location::file_path_helper::{ ensure_sub_path_is_directory, ensure_sub_path_is_in_location, - file_path_just_id_materialized_path, find_many_file_paths_by_full_path, - get_existing_file_path_id, MaterializedPath, + file_path_just_id_materialized_path, filter_existing_file_path_params, + filter_file_paths_by_many_full_path_params, retain_file_paths_in_location, + MaterializedPath, }, prisma::location, }; @@ -74,13 +75,6 @@ impl StatefulJob for ShallowIndexerJob { let location_id = state.init.location.id; let location_path = Path::new(&state.init.location.path); - // grab the next id so we can increment in memory for batch inserting - let first_file_id = last_file_path_id_manager - .get_max_file_path_id(location_id, db) - .await - .map_err(IndexerError::from)? - + 1; - let mut indexer_rules_by_kind: HashMap> = HashMap::with_capacity(state.init.location.indexer_rules.len()); for location_rule in &state.init.location.indexer_rules { @@ -92,7 +86,7 @@ impl StatefulJob for ShallowIndexerJob { .push(indexer_rule); } - let (to_walk_path, parent_id) = if state.init.sub_path != Path::new("") { + let (to_walk_path, parent_file_path) = if state.init.sub_path != Path::new("") { let full_path = ensure_sub_path_is_in_location(location_path, &state.init.sub_path) .await .map_err(IndexerError::from)?; @@ -100,28 +94,33 @@ impl StatefulJob for ShallowIndexerJob { .await .map_err(IndexerError::from)?; + let materialized_path = + MaterializedPath::new(location_id, location_path, &full_path, true) + .map_err(IndexerError::from)?; + ( - location_path.join(&state.init.sub_path), - get_existing_file_path_id( - MaterializedPath::new(location_id, location_path, &full_path, true) - .map_err(IndexerError::from)?, - db, - ) - .await - .map_err(IndexerError::from)? - .expect("Sub path should already exist in the database"), + full_path, + db.file_path() + .find_first(filter_existing_file_path_params(&materialized_path)) + .select(file_path_just_id_materialized_path::select()) + .exec() + .await + .map_err(IndexerError::from)? + .expect("Sub path should already exist in the database"), ) } else { ( location_path.to_path_buf(), - get_existing_file_path_id( - MaterializedPath::new(location_id, location_path, location_path, true) - .map_err(IndexerError::from)?, - db, - ) - .await - .map_err(IndexerError::from)? - .expect("Location root path should already exist in the database"), + db.file_path() + .find_first(filter_existing_file_path_params( + &MaterializedPath::new(location_id, location_path, location_path, true) + .map_err(IndexerError::from)?, + )) + .select(file_path_just_id_materialized_path::select()) + .exec() + .await + .map_err(IndexerError::from)? + .expect("Location root path should already exist in the database"), ) }; @@ -141,22 +140,42 @@ impl StatefulJob for ShallowIndexerJob { ) .await?; - let already_existing_file_paths = find_many_file_paths_by_full_path( - &location::Data::from(&state.init.location), - &found_paths - .iter() - .map(|entry| &entry.path) - .collect::>(), - db, - ) - .await - .map_err(IndexerError::from)? - .select(file_path_just_id_materialized_path::select()) - .exec() - .await? - .into_iter() - .map(|file_path| file_path.materialized_path) - .collect::>(); + let (already_existing_file_paths, mut to_retain) = db + .file_path() + .find_many( + filter_file_paths_by_many_full_path_params( + &location::Data::from(&state.init.location), + &found_paths + .iter() + .map(|entry| &entry.path) + .collect::>(), + ) + .await + .map_err(IndexerError::from)?, + ) + .select(file_path_just_id_materialized_path::select()) + .exec() + .await? + .into_iter() + .map(|file_path| (file_path.materialized_path, file_path.id)) + .unzip::<_, _, HashSet<_>, Vec<_>>(); + + let parent_id = parent_file_path.id; + + // Adding our parent path id + to_retain.push(parent_id); + + // Removing all other file paths that are not in the filesystem anymore + let removed_paths = + retain_file_paths_in_location(location_id, to_retain, Some(parent_file_path), db) + .await + .map_err(IndexerError::from)?; + + // Syncing the last file path id manager, as we potentially just removed a bunch of ids + last_file_path_id_manager + .sync(location_id, db) + .await + .map_err(IndexerError::from)?; // Filter out paths that are already in the databases let mut new_paths = found_paths @@ -177,6 +196,8 @@ impl StatefulJob for ShallowIndexerJob { created_at: entry.created_at, file_id: 0, // To be set later parent_id: Some(parent_id), + inode: entry.inode, + device: entry.device, }) }, ) @@ -186,16 +207,16 @@ impl StatefulJob for ShallowIndexerJob { .collect::>(); let total_paths = new_paths.len(); - let last_file_id = first_file_id + total_paths as i32; - // Setting our global state for file_path ids - last_file_path_id_manager - .set_max_file_path_id(location_id, last_file_id) - .await; + // grab the next id so we can increment in memory for batch inserting + let first_file_id = last_file_path_id_manager + .increment(location_id, total_paths as i32, db) + .await + .map_err(IndexerError::from)?; new_paths .iter_mut() - .zip(first_file_id..last_file_id) + .zip(first_file_id..) .for_each(|(entry, file_id)| { entry.file_id = file_id; }); @@ -207,6 +228,7 @@ impl StatefulJob for ShallowIndexerJob { scan_read_time: scan_start.elapsed(), total_paths, indexed_paths: 0, + removed_paths, }); state.steps = new_paths diff --git a/core/src/location/indexer/walk.rs b/core/src/location/indexer/walk.rs index d27079cf0..5494674ed 100644 --- a/core/src/location/indexer/walk.rs +++ b/core/src/location/indexer/walk.rs @@ -1,10 +1,17 @@ -use chrono::{DateTime, Utc}; +#[cfg(target_family = "unix")] +use crate::location::file_path_helper::get_inode_and_device; + +#[cfg(target_family = "windows")] +use crate::location::file_path_helper::get_inode_and_device_from_path; + use std::{ cmp::Ordering, collections::{HashMap, VecDeque}, hash::{Hash, Hasher}, path::{Path, PathBuf}, }; + +use chrono::{DateTime, Utc}; use tokio::fs; use tracing::{error, trace}; @@ -20,6 +27,8 @@ pub(super) struct WalkEntry { pub(super) path: PathBuf, pub(super) is_dir: bool, pub(super) created_at: DateTime, + pub(super) inode: u64, + pub(super) device: u64, } impl PartialEq for WalkEntry { @@ -137,7 +146,7 @@ async fn inner_walk_single_dir( ); if let Some(reject_rules) = rules_per_kind.get(&RuleKind::RejectFilesByGlob) { for reject_rule in reject_rules { - // It's ok to unwrap here, reject rules are infallible + // SAFETY: It's ok to unwrap here, reject rules of this kind are infallible if !reject_rule.apply(¤t_path).await.unwrap() { trace!( "Path {} rejected by rule {}", @@ -158,6 +167,27 @@ async fn inner_walk_single_dir( let is_dir = metadata.is_dir(); + let (inode, device) = match { + #[cfg(target_family = "unix")] + { + get_inode_and_device(&metadata) + } + + #[cfg(target_family = "windows")] + { + get_inode_and_device_from_path(¤t_path).await + } + } { + Ok(inode_and_device) => inode_and_device, + Err(e) => { + error!( + "Error getting inode and device for {}: {e}", + current_path.display(), + ); + continue 'entries; + } + }; + if is_dir { // If it is a directory, first we check if we must reject it and its children entirely if let Some(reject_by_children_rules) = @@ -259,6 +289,8 @@ async fn inner_walk_single_dir( path: current_path.clone(), is_dir, created_at: metadata.created()?.into(), + inode, + device, }, ); @@ -270,12 +302,27 @@ async fn inner_walk_single_dir( { trace!("Indexing ancestor {}", ancestor.display()); if !indexed_paths.contains_key(ancestor) { + let metadata = fs::metadata(ancestor).await?; + let (inode, device) = { + #[cfg(target_family = "unix")] + { + get_inode_and_device(&metadata)? + } + + #[cfg(target_family = "windows")] + { + get_inode_and_device_from_path(ancestor).await? + } + }; + indexed_paths.insert( ancestor.to_path_buf(), WalkEntry { path: ancestor.to_path_buf(), is_dir: true, - created_at: fs::metadata(ancestor).await?.created()?.into(), + created_at: metadata.created()?.into(), + inode, + device, }, ); } else { @@ -299,11 +346,24 @@ async fn prepared_indexed_paths( if include_root { // Also adding the root location path - let root_created_at = fs::metadata(&root).await?.created()?.into(); + let metadata = fs::metadata(&root).await?; + let (inode, device) = { + #[cfg(target_family = "unix")] + { + get_inode_and_device(&metadata)? + } + + #[cfg(target_family = "windows")] + { + get_inode_and_device_from_path(&root).await? + } + }; indexed_paths.push(WalkEntry { path: root, is_dir: true, - created_at: root_created_at, + created_at: metadata.created()?.into(), + inode, + device, }); } @@ -410,33 +470,35 @@ mod tests { let root = prepare_location().await; let root_path = root.path(); - let any_datetime = Utc::now(); + let created_at = Utc::now(); + let inode = 0; + let device = 0; #[rustfmt::skip] let expected = [ - WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/.git"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/Cargo.toml"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/src"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/src/main.rs"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/target"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/target/debug"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/target/debug/main"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/.git"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/package.json"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/src"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/src/App.tsx"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/node_modules"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/node_modules/react"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/node_modules/react/package.json"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("photos"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("photos/photo1.png"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("photos/photo2.jpg"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("photos/photo3.jpeg"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("photos/text.txt"), is_dir: false, created_at: any_datetime }, + WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/.git"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/Cargo.toml"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/src"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/src/main.rs"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/target"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/target/debug"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/target/debug/main"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("inner"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/.git"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/package.json"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/src"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/src/App.tsx"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/node_modules"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/node_modules/react"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/node_modules/react/package.json"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("photos"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("photos/photo1.png"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("photos/photo2.jpg"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("photos/photo3.jpeg"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("photos/text.txt"), is_dir: false, created_at, inode, device }, ] .into_iter() .collect::>(); @@ -456,15 +518,17 @@ mod tests { let root = prepare_location().await; let root_path = root.path(); - let any_datetime = Utc::now(); + let created_at = Utc::now(); + let inode = 0; + let device = 0; #[rustfmt::skip] let expected = [ - WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("photos"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("photos/photo1.png"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("photos/photo2.jpg"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("photos/photo3.jpeg"), is_dir: false, created_at: any_datetime }, + WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("photos"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("photos/photo1.png"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("photos/photo2.jpg"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("photos/photo3.jpeg"), is_dir: false, created_at, inode, device }, ] .into_iter() .collect::>(); @@ -495,28 +559,30 @@ mod tests { let root = prepare_location().await; let root_path = root.path(); - let any_datetime = Utc::now(); + let created_at = Utc::now(); + let inode = 0; + let device = 0; #[rustfmt::skip] let expected = [ - WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/.git"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/Cargo.toml"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/src"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/src/main.rs"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/target"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/target/debug"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/target/debug/main"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/.git"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/package.json"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/src"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/src/App.tsx"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/node_modules"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/node_modules/react"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/node_modules/react/package.json"), is_dir: false, created_at: any_datetime }, + WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/.git"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/Cargo.toml"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/src"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/src/main.rs"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/target"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/target/debug"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/target/debug/main"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("inner"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/.git"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/package.json"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/src"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/src/App.tsx"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/node_modules"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/node_modules/react"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/node_modules/react/package.json"), is_dir: false, created_at, inode, device }, ] .into_iter() .collect::>(); @@ -549,22 +615,24 @@ mod tests { let root = prepare_location().await; let root_path = root.path(); - let any_datetime = Utc::now(); + let created_at = Utc::now(); + let inode = 0; + let device = 0; #[rustfmt::skip] let expected = [ - WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/.git"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/Cargo.toml"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/src"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("rust_project/src/main.rs"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/.git"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/package.json"), is_dir: false, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/src"), is_dir: true, created_at: any_datetime }, - WalkEntry { path: root_path.join("inner/node_project/src/App.tsx"), is_dir: false, created_at: any_datetime }, + WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/.git"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/Cargo.toml"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/src"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("rust_project/src/main.rs"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("inner"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/.git"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/package.json"), is_dir: false, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/src"), is_dir: true, created_at, inode, device }, + WalkEntry { path: root_path.join("inner/node_project/src/App.tsx"), is_dir: false, created_at, inode, device }, ] .into_iter() .collect::>(); diff --git a/core/src/location/manager/helpers.rs b/core/src/location/manager/helpers.rs index ae7d645ab..dd4a56a49 100644 --- a/core/src/location/manager/helpers.rs +++ b/core/src/location/manager/helpers.rs @@ -17,28 +17,31 @@ type LocationAndLibraryKey = (LocationId, LibraryId); const LOCATION_CHECK_INTERVAL: Duration = Duration::from_secs(5); -pub(super) async fn check_online(location: &location::Data, library: &Library) -> bool { - let pub_id = &location.pub_id; +pub(super) async fn check_online( + location: &location::Data, + library: &Library, +) -> Result { + let pub_id = Uuid::from_slice(&location.pub_id)?; if location.node_id == library.node_local_id { match fs::metadata(&location.path).await { Ok(_) => { library.location_manager().add_online(pub_id).await; - true + Ok(true) } Err(e) if e.kind() == ErrorKind::NotFound => { - library.location_manager().remove_online(pub_id).await; - false + library.location_manager().remove_online(&pub_id).await; + Ok(false) } Err(e) => { error!("Failed to check if location is online: {:#?}", e); - false + Ok(false) } } } else { // In this case, we don't have a `local_path`, but this location was marked as online - library.location_manager().remove_online(pub_id).await; - false + library.location_manager().remove_online(&pub_id).await; + Err(LocationManagerError::NonLocalLocation(location.id)) } } diff --git a/core/src/location/manager/mod.rs b/core/src/location/manager/mod.rs index aa5b45174..c259c5ee4 100644 --- a/core/src/location/manager/mod.rs +++ b/core/src/location/manager/mod.rs @@ -19,6 +19,7 @@ use tracing::{debug, error}; #[cfg(feature = "location-watcher")] use tokio::sync::mpsc; +use uuid::Uuid; use super::{file_path_helper::FilePathError, LocationId}; @@ -85,8 +86,12 @@ pub enum LocationManagerError { #[error("Failed to stop or reinit a watcher: {reason}")] FailedToStopOrReinitWatcher { reason: String }, - #[error("Location missing local path: ")] - LocationMissingLocalPath(LocationId), + #[error("Missing location from database: ")] + MissingLocation(LocationId), + + #[error("Non local location: ")] + NonLocalLocation(LocationId), + #[error("Tried to update a non-existing file: ")] UpdateNonExistingFile(PathBuf), #[error("Database error: {0}")] @@ -95,9 +100,11 @@ pub enum LocationManagerError { IOError(#[from] io::Error), #[error("File path related error (error: {0})")] FilePathError(#[from] FilePathError), + #[error("Corrupted location pub_id on database: (error: {0})")] + CorruptedLocationPubId(#[from] uuid::Error), } -type OnlineLocations = BTreeSet>; +type OnlineLocations = BTreeSet; #[derive(Debug)] pub struct LocationManager { @@ -324,7 +331,13 @@ impl LocationManager { // To add a new location ManagementMessageAction::Add => { if let Some(location) = get_location(location_id, &library).await { - let is_online = check_online(&location, &library).await; + let is_online = match check_online(&location, &library).await { + Ok(is_online) => is_online, + Err(e) => { + error!("Error while checking online status of location {location_id}: {e}"); + continue; + } + }; let _ = response_tx.send( LocationWatcher::new(location, library.clone()) .await @@ -426,7 +439,15 @@ impl LocationManager { to_remove.remove(&key); } else if let Some(location) = get_location(location_id, &library).await { if location.node_id == library.node_local_id { - if check_online(&location, &library).await + let is_online = match check_online(&location, &library).await { + Ok(is_online) => is_online, + Err(e) => { + error!("Error while checking online status of location {location_id}: {e}"); + continue; + } + }; + + if is_online && !forced_unwatch.contains(&key) { watch_location( @@ -449,7 +470,7 @@ impl LocationManager { location_id, library.id, "Dropping location from location manager, because \ - we don't have a `local_path` anymore", + it isn't a location in the current node", &mut locations_watched, &mut locations_unwatched ); @@ -477,7 +498,7 @@ impl LocationManager { Ok(()) } - pub async fn is_online(&self, id: &Vec) -> bool { + pub async fn is_online(&self, id: &Uuid) -> bool { let online_locations = self.online_locations.read().await; online_locations.contains(id) } @@ -490,12 +511,12 @@ impl LocationManager { self.online_tx.send(self.get_online().await).ok(); } - pub async fn add_online(&self, id: &[u8]) { - self.online_locations.write().await.insert(id.into()); + pub async fn add_online(&self, id: Uuid) { + self.online_locations.write().await.insert(id); self.broadcast_online().await; } - pub async fn remove_online(&self, id: &Vec) { + pub async fn remove_online(&self, id: &Uuid) { let mut online_locations = self.online_locations.write().await; online_locations.remove(id); self.broadcast_online().await; diff --git a/core/src/location/manager/watcher/linux.rs b/core/src/location/manager/watcher/linux.rs index 1ff6ccbd3..f54ab8edf 100644 --- a/core/src/location/manager/watcher/linux.rs +++ b/core/src/location/manager/watcher/linux.rs @@ -1,6 +1,16 @@ -use crate::{ - library::Library, - location::{location_with_indexer_rules, manager::LocationManagerError}, +//! Linux has the best behaving file system events, with just some small caveats: +//! When we move files or directories, we receive 3 events: Rename From, Rename To and Rename Both. +//! But when we move a file or directory to the outside from the watched location, we just receive +//! the Rename From event, so we have to keep track of all rename events to match them against each +//! other. If we have dangling Rename From events, we have to remove them after some time. +//! Aside from that, when a directory is moved to our watched location from the outside, we receive +//! a Create Dir event, this one is actually ok at least. + +use crate::{invalidate_query, library::Library, location::manager::LocationManagerError}; + +use std::{ + collections::{BTreeMap, HashMap}, + path::PathBuf, }; use async_trait::async_trait; @@ -8,43 +18,77 @@ use notify::{ event::{AccessKind, AccessMode, CreateKind, ModifyKind, RenameMode}, Event, EventKind, }; -use tracing::trace; +use tokio::{fs, time::Instant}; +use tracing::{error, trace}; use super::{ - utils::{create_dir, file_creation_or_update, remove_event, rename_both_event}, - EventHandler, + utils::{create_dir, file_creation_or_update, remove, rename}, + EventHandler, LocationId, HUNDRED_MILLIS, }; #[derive(Debug)] -pub(super) struct LinuxEventHandler {} +pub(super) struct LinuxEventHandler<'lib> { + location_id: LocationId, + library: &'lib Library, + last_check_rename: Instant, + rename_from: HashMap, + rename_from_buffer: Vec<(PathBuf, Instant)>, + recently_renamed_from: BTreeMap, +} #[async_trait] -impl EventHandler for LinuxEventHandler { - fn new() -> Self { - Self {} +impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> { + fn new(location_id: LocationId, library: &'lib Library) -> Self { + Self { + location_id, + library, + last_check_rename: Instant::now(), + rename_from: HashMap::new(), + rename_from_buffer: Vec::new(), + recently_renamed_from: BTreeMap::new(), + } } - async fn handle_event( - &mut self, - location: location_with_indexer_rules::Data, - library: &Library, - event: Event, - ) -> Result<(), LocationManagerError> { - trace!("Received Linux event: {:#?}", event); + async fn handle_event(&mut self, event: Event) -> Result<(), LocationManagerError> { + tracing::debug!("Received Linux event: {:#?}", event); - match event.kind { + let Event { + kind, mut paths, .. + } = event; + + match kind { EventKind::Access(AccessKind::Close(AccessMode::Write)) => { // If a file was closed with write mode, then it was updated or created - file_creation_or_update(&location, &event, library).await?; + file_creation_or_update(self.location_id, &paths[0], self.library).await?; } EventKind::Create(CreateKind::Folder) => { - create_dir(&location, &event, library).await?; + let path = &paths[0]; + + create_dir( + self.location_id, + path, + &fs::metadata(path).await?, + self.library, + ) + .await?; } + EventKind::Modify(ModifyKind::Name(RenameMode::From)) => { + // Just in case we can't garantee that we receive the Rename From event before the + // Rename Both event. Just a safeguard + if self.recently_renamed_from.remove(&paths[0]).is_none() { + self.rename_from.insert(paths.remove(0), Instant::now()); + } + } + EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => { - rename_both_event(&location, &event, library).await?; + let from_path = &paths[0]; + self.rename_from.remove(from_path); + rename(self.location_id, &paths[1], from_path, self.library).await?; + self.recently_renamed_from + .insert(paths.swap_remove(0), Instant::now()); } - EventKind::Remove(remove_kind) => { - remove_event(&location, &event, remove_kind, library).await?; + EventKind::Remove(_) => { + remove(self.location_id, &paths[0], self.library).await?; } other_event_kind => { trace!("Other Linux event that we don't handle for now: {other_event_kind:#?}"); @@ -53,4 +97,37 @@ impl EventHandler for LinuxEventHandler { Ok(()) } + + async fn tick(&mut self) { + if self.last_check_rename.elapsed() > HUNDRED_MILLIS { + self.last_check_rename = Instant::now(); + self.handle_rename_from_eviction().await; + + self.recently_renamed_from + .retain(|_, instant| instant.elapsed() < HUNDRED_MILLIS); + } + } +} + +impl LinuxEventHandler<'_> { + async fn handle_rename_from_eviction(&mut self) { + self.rename_from_buffer.clear(); + + for (path, instant) in self.rename_from.drain() { + if instant.elapsed() > HUNDRED_MILLIS { + if let Err(e) = remove(self.location_id, &path, self.library).await { + error!("Failed to remove file_path: {e}"); + } else { + trace!("Removed file_path due timeout: {}", path.display()); + invalidate_query!(self.library, "locations.getExplorerData"); + } + } else { + self.rename_from_buffer.push((path, instant)); + } + } + + for (path, instant) in self.rename_from_buffer.drain(..) { + self.rename_from.insert(path, instant); + } + } } diff --git a/core/src/location/manager/watcher/macos.rs b/core/src/location/manager/watcher/macos.rs index a72e40b52..be9be1c53 100644 --- a/core/src/location/manager/watcher/macos.rs +++ b/core/src/location/manager/watcher/macos.rs @@ -1,6 +1,27 @@ +//! On MacOS, we use the FSEvents backend of notify-rs and Rename events are pretty complicated; +//! There are just (ModifyKind::Name(RenameMode::Any) events and nothing else. +//! This means that we have to link the old path with the new path to know which file was renamed. +//! But you can't forget that renames events aren't always the case that I file name was modified, +//! but its path was modified. So we have to check if the file was moved. When a file is moved +//! inside the same location, we received 2 events: one for the old path and one for the new path. +//! But when a file is moved to another location, we only receive the old path event... This +//! way we have to handle like a file deletion, and the same applies for when a file is moved to our +//! current location from anywhere else, we just receive the new path rename event, which means a +//! creation. + use crate::{ + invalidate_query, library::Library, - location::{location_with_indexer_rules, manager::LocationManagerError}, + location::{ + file_path_helper::{check_existing_file_path, get_inode_and_device, MaterializedPath}, + manager::LocationManagerError, + LocationId, + }, +}; + +use std::{ + collections::{BTreeMap, HashMap}, + path::PathBuf, }; use async_trait::async_trait; @@ -8,40 +29,60 @@ use notify::{ event::{CreateKind, DataChange, ModifyKind, RenameMode}, Event, EventKind, }; -use tracing::trace; +use tokio::{fs, io, time::Instant}; +use tracing::{error, trace, warn}; use super::{ - utils::{create_dir, file_creation_or_update, remove_event, rename}, - EventHandler, + utils::{ + create_dir, create_dir_or_file, create_file, extract_inode_and_device_from_path, + extract_location_path, remove, rename, update_file, + }, + EventHandler, INodeAndDevice, InstantAndPath, HUNDRED_MILLIS, ONE_SECOND, }; -#[derive(Debug, Default)] -pub(super) struct MacOsEventHandler { - latest_created_dir: Option, - rename_stack: Option, +#[derive(Debug)] +pub(super) struct MacOsEventHandler<'lib> { + location_id: LocationId, + library: &'lib Library, + recently_created_files: BTreeMap, + last_check_created_files: Instant, + latest_created_dir: Option, + last_check_rename: Instant, + old_paths_map: HashMap, + new_paths_map: HashMap, + paths_map_buffer: Vec<(INodeAndDevice, InstantAndPath)>, } #[async_trait] -impl EventHandler for MacOsEventHandler { - fn new() -> Self +impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> { + fn new(location_id: LocationId, library: &'lib Library) -> Self where Self: Sized, { - Default::default() + Self { + location_id, + library, + recently_created_files: BTreeMap::new(), + last_check_created_files: Instant::now(), + latest_created_dir: None, + last_check_rename: Instant::now(), + old_paths_map: HashMap::new(), + new_paths_map: HashMap::new(), + paths_map_buffer: Vec::new(), + } } - async fn handle_event( - &mut self, - location: location_with_indexer_rules::Data, - library: &Library, - event: Event, - ) -> Result<(), LocationManagerError> { + async fn handle_event(&mut self, event: Event) -> Result<(), LocationManagerError> { trace!("Received MacOS event: {:#?}", event); - match event.kind { + let Event { + kind, mut paths, .. + } = event; + + match kind { EventKind::Create(CreateKind::Folder) => { if let Some(latest_created_dir) = self.latest_created_dir.take() { - if event.paths[0] == latest_created_dir.paths[0] { + if paths[0] == latest_created_dir { // NOTE: This is a MacOS specific event that happens when a folder is created // trough Finder. It creates a folder but 2 events are triggered in // FSEvents. So we store and check the latest created folder to avoid @@ -50,26 +91,40 @@ impl EventHandler for MacOsEventHandler { } } - create_dir(&location, &event, library).await?; - self.latest_created_dir = Some(event); + create_dir( + self.location_id, + &paths[0], + &fs::metadata(&paths[0]).await?, + self.library, + ) + .await?; + self.latest_created_dir = Some(paths.remove(0)); + } + EventKind::Create(CreateKind::File) => { + create_file( + self.location_id, + &paths[0], + &fs::metadata(&paths[0]).await?, + self.library, + ) + .await?; + self.recently_created_files + .insert(paths.remove(0), Instant::now()); } EventKind::Modify(ModifyKind::Data(DataChange::Content)) => { - // If a file had its content modified, then it was updated or created - file_creation_or_update(&location, &event, library).await?; - } - EventKind::Modify(ModifyKind::Name(RenameMode::Any)) => { - match self.rename_stack.take() { - None => { - self.rename_stack = Some(event); - } - Some(from_event) => { - rename(&event.paths[0], &from_event.paths[0], &location, library).await?; - } + // NOTE: MacOS emits a Create File and then an Update Content event + // when a file is created. So we need to check if the file was recently + // created to avoid unecessary updates + if !self.recently_created_files.contains_key(&paths[0]) { + update_file(self.location_id, &paths[0], self.library).await?; } } + EventKind::Modify(ModifyKind::Name(RenameMode::Any)) => { + self.handle_single_rename_event(paths.remove(0)).await?; + } - EventKind::Remove(remove_kind) => { - remove_event(&location, &event, remove_kind, library).await?; + EventKind::Remove(_) => { + remove(self.location_id, &paths[0], self.library).await?; } other_event_kind => { trace!("Other MacOS event that we don't handle for now: {other_event_kind:#?}"); @@ -78,4 +133,138 @@ impl EventHandler for MacOsEventHandler { Ok(()) } + + async fn tick(&mut self) { + // Cleaning out recently created files that are older than 1 second + if self.last_check_created_files.elapsed() > ONE_SECOND { + self.last_check_created_files = Instant::now(); + self.recently_created_files + .retain(|_, created_at| created_at.elapsed() < ONE_SECOND); + } + + if self.last_check_rename.elapsed() > HUNDRED_MILLIS { + // Cleaning out recently renamed files that are older than 100 milliseconds + self.handle_create_eviction().await; + self.handle_remove_eviction().await; + } + } +} + +impl MacOsEventHandler<'_> { + async fn handle_create_eviction(&mut self) { + // Just to make sure that our buffer is clean + self.paths_map_buffer.clear(); + + for (inode_and_device, (instant, path)) in self.new_paths_map.drain() { + if instant.elapsed() > HUNDRED_MILLIS { + if let Err(e) = create_dir_or_file(self.location_id, &path, self.library).await { + error!("Failed to create file_path on MacOS : {e}"); + } else { + trace!("Created file_path due timeout: {}", path.display()); + invalidate_query!(self.library, "locations.getExplorerData"); + } + } else { + self.paths_map_buffer + .push((inode_and_device, (instant, path))); + } + } + + for (key, value) in self.paths_map_buffer.drain(..) { + self.new_paths_map.insert(key, value); + } + } + + async fn handle_remove_eviction(&mut self) { + // Just to make sure that our buffer is clean + self.paths_map_buffer.clear(); + + for (inode_and_device, (instant, path)) in self.old_paths_map.drain() { + if instant.elapsed() > HUNDRED_MILLIS { + if let Err(e) = remove(self.location_id, &path, self.library).await { + error!("Failed to remove file_path: {e}"); + } else { + trace!("Removed file_path due timeout: {}", path.display()); + invalidate_query!(self.library, "locations.getExplorerData"); + } + } else { + self.paths_map_buffer + .push((inode_and_device, (instant, path))); + } + } + + for (key, value) in self.paths_map_buffer.drain(..) { + self.old_paths_map.insert(key, value); + } + } + + async fn handle_single_rename_event( + &mut self, + path: PathBuf, // this is used internally only once, so we can use just PathBuf + ) -> Result<(), LocationManagerError> { + match fs::metadata(&path).await { + Ok(meta) => { + // File or directory exists, so this can be a "new path" to an actual rename/move or a creation + trace!("Path exists: {}", path.display()); + + let inode_and_device = get_inode_and_device(&meta)?; + let location_path = extract_location_path(self.location_id, self.library).await?; + + if !check_existing_file_path( + &MaterializedPath::new(self.location_id, &location_path, &path, meta.is_dir())?, + &self.library.db, + ) + .await? + { + if let Some((_, old_path)) = self.old_paths_map.remove(&inode_and_device) { + trace!( + "Got a match new -> old: {} -> {}", + path.display(), + old_path.display() + ); + + // We found a new path for this old path, so we can rename it + rename(self.location_id, &path, &old_path, self.library).await?; + } else { + trace!("No match for new path yet: {}", path.display()); + self.new_paths_map + .insert(inode_and_device, (Instant::now(), path)); + } + } else { + warn!( + "Received rename event for a file that already exists in the database: {}", + path.display() + ); + } + } + Err(e) if e.kind() == io::ErrorKind::NotFound => { + // File or directory does not exist in the filesystem, if it exists in the database, + // then we try pairing it with the old path from our map + + trace!("Path doesn't exists: {}", path.display()); + + let inode_and_device = + extract_inode_and_device_from_path(self.location_id, &path, self.library) + .await?; + + if let Some((_, new_path)) = self.new_paths_map.remove(&inode_and_device) { + trace!( + "Got a match old -> new: {} -> {}", + path.display(), + new_path.display() + ); + + // We found a new path for this old path, so we can rename it + rename(self.location_id, &new_path, &path, self.library).await?; + } else { + trace!("No match for old path yet: {}", path.display()); + // We didn't find a new path for this old path, so we store ir for later + self.old_paths_map + .insert(inode_and_device, (Instant::now(), path)); + } + } + Err(e) => return Err(e.into()), + } + + Ok(()) + } } diff --git a/core/src/location/manager/watcher/mod.rs b/core/src/location/manager/watcher/mod.rs index 4abfeba3c..d46cb6800 100644 --- a/core/src/location/manager/watcher/mod.rs +++ b/core/src/location/manager/watcher/mod.rs @@ -1,12 +1,9 @@ -use crate::{ - library::Library, - location::{find_location, location_with_indexer_rules, LocationId}, - prisma::location, -}; +use crate::{library::Library, location::LocationId, prisma::location}; use std::{ collections::HashSet, path::{Path, PathBuf}, + time::Duration, }; use async_trait::async_trait; @@ -16,8 +13,10 @@ use tokio::{ select, sync::{mpsc, oneshot}, task::{block_in_place, JoinHandle}, + time::{interval_at, Instant, MissedTickBehavior}, }; use tracing::{debug, error, warn}; +use uuid::Uuid; use super::LocationManagerError; @@ -30,28 +29,34 @@ mod utils; use utils::check_event; #[cfg(target_os = "linux")] -type Handler = linux::LinuxEventHandler; +type Handler<'lib> = linux::LinuxEventHandler<'lib>; #[cfg(target_os = "macos")] -type Handler = macos::MacOsEventHandler; +type Handler<'lib> = macos::MacOsEventHandler<'lib>; #[cfg(target_os = "windows")] -type Handler = windows::WindowsEventHandler; +type Handler<'lib> = windows::WindowsEventHandler<'lib>; pub(super) type IgnorePath = (PathBuf, bool); +type INodeAndDevice = (u64, u64); +type InstantAndPath = (Instant, PathBuf); + +const ONE_SECOND: Duration = Duration::from_secs(1); +const HUNDRED_MILLIS: Duration = Duration::from_millis(100); + #[async_trait] -trait EventHandler { - fn new() -> Self +trait EventHandler<'lib> { + fn new(location_id: LocationId, library: &'lib Library) -> Self where Self: Sized; - async fn handle_event( - &mut self, - location: location_with_indexer_rules::Data, - library: &Library, - event: Event, - ) -> Result<(), LocationManagerError>; + /// Handle a file system event. + async fn handle_event(&mut self, event: Event) -> Result<(), LocationManagerError>; + + /// As Event Handlers have some inner state, from time to time we need to call this tick method + /// so the event handler can update its state. + async fn tick(&mut self); } #[derive(Debug)] @@ -93,6 +98,7 @@ impl LocationWatcher { let handle = tokio::spawn(Self::handle_watch_events( location.id, + Uuid::from_slice(&location.pub_id)?, library, events_rx, ignore_path_rx, @@ -110,15 +116,20 @@ impl LocationWatcher { async fn handle_watch_events( location_id: LocationId, + location_pub_id: Uuid, library: Library, mut events_rx: mpsc::UnboundedReceiver>, mut ignore_path_rx: mpsc::UnboundedReceiver, mut stop_rx: oneshot::Receiver<()>, ) { - let mut event_handler = Handler::new(); + let mut event_handler = Handler::new(location_id, &library); let mut paths_to_ignore = HashSet::new(); + let mut handler_interval = interval_at(Instant::now() + HUNDRED_MILLIS, HUNDRED_MILLIS); + // In case of doubt check: https://docs.rs/tokio/latest/tokio/time/enum.MissedTickBehavior.html + handler_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + loop { select! { Some(event) = events_rx.recv() => { @@ -126,6 +137,7 @@ impl LocationWatcher { Ok(event) => { if let Err(e) = Self::handle_single_event( location_id, + location_pub_id, event, &mut event_handler, &library, @@ -150,6 +162,10 @@ impl LocationWatcher { } } + _ = handler_interval.tick() => { + event_handler.tick().await; + } + _ = &mut stop_rx => { debug!("Stop Location Manager event handler for location: ", location_id); break @@ -158,32 +174,33 @@ impl LocationWatcher { } } - async fn handle_single_event( + async fn handle_single_event<'lib>( location_id: LocationId, + location_pub_id: Uuid, event: Event, - event_handler: &mut impl EventHandler, - library: &Library, + event_handler: &mut impl EventHandler<'lib>, + library: &'lib Library, ignore_paths: &HashSet, ) -> Result<(), LocationManagerError> { if !check_event(&event, ignore_paths) { return Ok(()); } - let Some(location) = find_location(library, location_id) - .include(location_with_indexer_rules::include()) - .exec() - .await? - else { - warn!("Tried to handle event for unknown location: "); - return Ok(()); - }; + // let Some(location) = find_location(library, location_id) + // .include(location_with_indexer_rules::include()) + // .exec() + // .await? + // else { + // warn!("Tried to handle event for unknown location: "); + // return Ok(()); + // }; - if !library.location_manager().is_online(&location.pub_id).await { + if !library.location_manager().is_online(&location_pub_id).await { warn!("Tried to handle event for offline location: "); return Ok(()); } - event_handler.handle_event(location, library, event).await + event_handler.handle_event(event).await } pub(super) fn ignore_path( @@ -265,7 +282,7 @@ impl Drop for LocationWatcher { /*************************************************************************************************** * Some tests to validate our assumptions of events through different file systems * -*************************************************************************************************** +**************************************************************************************************** * Events dispatched on Linux: * * Create File: * * 1) EventKind::Create(CreateKind::File) * @@ -334,26 +351,30 @@ impl Drop for LocationWatcher { * Events dispatched on iOS: * * TODO * * * -**************************************************************************************************/ +***************************************************************************************************/ #[cfg(test)] -#[allow(unused)] mod tests { - #[cfg(target_os = "macos")] - use notify::event::DataChange; - use notify::{ - event::{AccessKind, AccessMode, CreateKind, ModifyKind, RemoveKind, RenameMode}, - Config, Event, EventKind, RecommendedWatcher, Watcher, - }; - use std::io::ErrorKind; use std::{ + io::ErrorKind, path::{Path, PathBuf}, time::Duration, }; + + use notify::{ + event::{CreateKind, ModifyKind, RemoveKind, RenameMode}, + Config, Event, EventKind, RecommendedWatcher, Watcher, + }; use tempfile::{tempdir, TempDir}; use tokio::{fs, io::AsyncWriteExt, sync::mpsc, time::sleep}; use tracing::{debug, error}; use tracing_test::traced_test; + #[cfg(target_os = "macos")] + use notify::event::DataChange; + + #[cfg(target_os = "linux")] + use notify::event::{AccessKind, AccessMode}; + async fn setup_watcher() -> ( TempDir, RecommendedWatcher, diff --git a/core/src/location/manager/watcher/utils.rs b/core/src/location/manager/watcher/utils.rs index 243e9ef42..d3235cdc3 100644 --- a/core/src/location/manager/watcher/utils.rs +++ b/core/src/location/manager/watcher/utils.rs @@ -4,11 +4,13 @@ use crate::{ location::{ delete_directory, file_path_helper::{ - extract_materialized_path, file_path_with_object, get_existing_file_or_directory, - get_existing_file_path_with_object, get_parent_dir, MaterializedPath, + extract_materialized_path, file_path_with_object, filter_existing_file_path_params, + get_parent_dir, get_parent_dir_id, loose_find_existing_file_path_params, FilePathError, + MaterializedPath, }, - location_with_indexer_rules, + find_location, location_with_indexer_rules, manager::LocationManagerError, + scan_location_sub_path, LocationId, }, object::{ file_identifier::FileMetadata, @@ -18,105 +20,160 @@ use crate::{ }, validation::hash::file_checksum, }, - prisma::{file_path, object}, + prisma::{file_path, location, object}, }; +#[cfg(target_family = "unix")] +use crate::location::file_path_helper::get_inode_and_device; + +#[cfg(target_family = "windows")] +use crate::location::file_path_helper::get_inode_and_device_from_path; + use std::{ collections::HashSet, - path::{Path, PathBuf}, + fs::Metadata, + path::{Path, PathBuf, MAIN_SEPARATOR, MAIN_SEPARATOR_STR}, str::FromStr, }; -use chrono::{DateTime, FixedOffset, Local, Utc}; -use int_enum::IntEnum; -use notify::{event::RemoveKind, Event}; -use prisma_client_rust::{raw, PrismaValue}; use sd_file_ext::extensions::ImageExtension; + +use chrono::{DateTime, Local}; +use int_enum::IntEnum; +use notify::{Event, EventKind}; +use prisma_client_rust::{raw, PrismaValue}; use tokio::{fs, io::ErrorKind}; use tracing::{error, info, trace, warn}; use uuid::Uuid; +use super::INodeAndDevice; + pub(super) fn check_event(event: &Event, ignore_paths: &HashSet) -> bool { - // if path includes .DS_Store, .spacedrive or is in the `ignore_paths` set, we ignore + // if path includes .DS_Store, .spacedrive file creation or is in the `ignore_paths` set, we ignore !event.paths.iter().any(|p| { let path_str = p.to_str().expect("Found non-UTF-8 path"); path_str.contains(".DS_Store") - || path_str.contains(".spacedrive") + || (path_str.contains(".spacedrive") && matches!(event.kind, EventKind::Create(_))) || ignore_paths.contains(p) }) } pub(super) async fn create_dir( - location: &location_with_indexer_rules::Data, - event: &Event, + location_id: LocationId, + path: impl AsRef, + metadata: &Metadata, library: &Library, ) -> Result<(), LocationManagerError> { - if location.node_id != library.node_local_id { - return Ok(()); - } + let location = find_location(library, location_id) + .include(location_with_indexer_rules::include()) + .exec() + .await? + .ok_or(LocationManagerError::MissingLocation(location_id))?; + + let path = path.as_ref(); trace!( "Location: creating directory: {}", location.path, - event.paths[0].display() + path.display() ); - let materialized_path = - MaterializedPath::new(location.id, &location.path, &event.paths[0], true)?; + let materialized_path = MaterializedPath::new(location.id, &location.path, path, true)?; + + let (inode, device) = { + #[cfg(target_family = "unix")] + { + get_inode_and_device(metadata)? + } + + #[cfg(target_family = "windows")] + { + // FIXME: This is a workaround for Windows, because we can't get the inode and device from the metadata + let _ = metadata; // To avoid unused variable warning + get_inode_and_device_from_path(&path).await? + } + }; let parent_directory = get_parent_dir(&materialized_path, &library.db).await?; trace!("parent_directory: {:?}", parent_directory); let Some(parent_directory) = parent_directory else { - warn!("Watcher found a path without parent"); + warn!("Watcher found a directory without parent"); return Ok(()) }; let created_path = library .last_file_path_id_manager - .create_file_path(&library.db, materialized_path, Some(parent_directory.id)) + .create_file_path( + &library.db, + materialized_path, + Some(parent_directory.id), + inode, + device, + ) .await?; info!("Created path: {}", created_path.materialized_path); + // scan the new directory + scan_location_sub_path(library, location, &created_path.materialized_path).await; + invalidate_query!(library, "locations.getExplorerData"); Ok(()) } pub(super) async fn create_file( - location: &location_with_indexer_rules::Data, - event: &Event, + location_id: LocationId, + path: impl AsRef, + metadata: &Metadata, library: &Library, ) -> Result<(), LocationManagerError> { - if location.node_id != library.node_local_id { - return Ok(()); - } - - let full_path = &event.paths[0]; + let path = path.as_ref(); + let location_path = extract_location_path(location_id, library).await?; trace!( "Location: creating file: {}", - &location.path, - full_path.display() + location_path.display(), + path.display() ); let db = &library.db; - let materialized_path = MaterializedPath::new(location.id, &location.path, full_path, false)?; + let materialized_path = MaterializedPath::new(location_id, &location_path, path, false)?; + + let (inode, device) = { + #[cfg(target_family = "unix")] + { + get_inode_and_device(metadata)? + } + + #[cfg(target_family = "windows")] + { + // FIXME: This is a workaround for Windows, because we can't get the inode and device from the metadata + let _ = metadata; // To avoid unused variable warning + get_inode_and_device_from_path(path).await? + } + }; let Some(parent_directory) = - get_parent_dir(&materialized_path, &library.db).await? + get_parent_dir(&materialized_path, db).await? else { - warn!("Watcher found a path without parent"); + warn!("Watcher found a file without parent"); return Ok(()) }; let created_file = library .last_file_path_id_manager - .create_file_path(&library.db, materialized_path, Some(parent_directory.id)) + .create_file_path( + db, + materialized_path, + Some(parent_directory.id), + inode, + device, + ) .await?; info!("Created path: {}", created_file.materialized_path); @@ -126,32 +183,23 @@ pub(super) async fn create_file( cas_id, kind, fs_metadata, - } = FileMetadata::new(&location.path, &created_file.materialized_path).await?; + } = FileMetadata::new( + &location_path, + &MaterializedPath::from((location_id, &created_file.materialized_path)), + ) + .await?; let existing_object = db .object() .find_first(vec![object::file_paths::some(vec![ file_path::cas_id::equals(Some(cas_id.clone())), ])]) + .select(object_just_id_has_thumbnail::select()) .exec() .await?; - let size_str = fs_metadata.len().to_string(); - let object = if let Some(object) = existing_object { - db.object() - .update( - object::id::equals(object.id), - vec![ - object::size_in_bytes::set(size_str), - object::date_indexed::set( - Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()), - ), - ], - ) - .select(object_just_id_has_thumbnail::select()) - .exec() - .await? + object } else { db.object() .create( @@ -161,7 +209,6 @@ pub(super) async fn create_file( DateTime::::from(fs_metadata.created().unwrap()).into(), ), object::kind::set(kind.int_value()), - object::size_in_bytes::set(size_str.clone()), ], ) .select(object_just_id_has_thumbnail::select()) @@ -171,15 +218,14 @@ pub(super) async fn create_file( db.file_path() .update( - file_path::location_id_id(location.id, created_file.id), + file_path::location_id_id(location_id, created_file.id), vec![file_path::object_id::set(Some(object.id))], ) .exec() .await?; - trace!("object: {:#?}", object); if !object.has_thumbnail && !created_file.extension.is_empty() { - generate_thumbnail(&created_file.extension, &cas_id, &event.paths[0], library).await; + generate_thumbnail(&created_file.extension, &cas_id, path, library).await; } invalidate_query!(library, "locations.getExplorerData"); @@ -187,66 +233,109 @@ pub(super) async fn create_file( Ok(()) } +pub(super) async fn create_dir_or_file( + location_id: LocationId, + path: impl AsRef, + library: &Library, +) -> Result { + let metadata = fs::metadata(path.as_ref()).await?; + if metadata.is_dir() { + create_dir(location_id, path, &metadata, library).await + } else { + create_file(location_id, path, &metadata, library).await + } + .map(|_| metadata) +} + pub(super) async fn file_creation_or_update( - location: &location_with_indexer_rules::Data, - event: &Event, + location_id: LocationId, + full_path: impl AsRef, library: &Library, ) -> Result<(), LocationManagerError> { - if let Some(ref file_path) = get_existing_file_path_with_object( - MaterializedPath::new(location.id, &location.path, &event.paths[0], false)?, - &library.db, - ) - .await? + let full_path = full_path.as_ref(); + let location_path = extract_location_path(location_id, library).await?; + + if let Some(ref file_path) = library + .db + .file_path() + .find_first(filter_existing_file_path_params(&MaterializedPath::new( + location_id, + &location_path, + full_path, + false, + )?)) + // include object for orphan check + .include(file_path_with_object::include()) + .exec() + .await? { - inner_update_file(location, file_path, event, library).await + inner_update_file(location_id, file_path, full_path, library).await } else { - // We received None because it is a new file - create_file(location, event, library).await + create_file( + location_id, + full_path, + &fs::metadata(full_path).await?, + library, + ) + .await } } pub(super) async fn update_file( - location: &location_with_indexer_rules::Data, - event: &Event, + location_id: LocationId, + full_path: impl AsRef, library: &Library, ) -> Result<(), LocationManagerError> { - if location.node_id == library.node_local_id { - if let Some(ref file_path) = get_existing_file_path_with_object( - MaterializedPath::new(location.id, &location.path, &event.paths[0], false)?, - &library.db, - ) + let full_path = full_path.as_ref(); + let location_path = extract_location_path(location_id, library).await?; + + if let Some(ref file_path) = library + .db + .file_path() + .find_first(filter_existing_file_path_params(&MaterializedPath::new( + location_id, + &location_path, + full_path, + false, + )?)) + // include object for orphan check + .include(file_path_with_object::include()) + .exec() .await? - { - let ret = inner_update_file(location, file_path, event, library).await; - invalidate_query!(library, "locations.getExplorerData"); - ret - } else { - Err(LocationManagerError::UpdateNonExistingFile( - event.paths[0].clone(), - )) - } + { + let ret = inner_update_file(location_id, file_path, full_path, library).await; + invalidate_query!(library, "locations.getExplorerData"); + ret } else { - Err(LocationManagerError::LocationMissingLocalPath(location.id)) + Err(LocationManagerError::UpdateNonExistingFile( + full_path.to_path_buf(), + )) } } async fn inner_update_file( - location: &location_with_indexer_rules::Data, + location_id: LocationId, file_path: &file_path_with_object::Data, - event: &Event, + full_path: impl AsRef, library: &Library, ) -> Result<(), LocationManagerError> { + let full_path = full_path.as_ref(); + let location_path = extract_location_path(location_id, library).await?; trace!( "Location: updating file: {}", - &location.path, - event.paths[0].display() + location_path.display(), + full_path.display() ); let FileMetadata { cas_id, fs_metadata, - .. - } = FileMetadata::new(&location.path, &file_path.materialized_path).await?; + kind, + } = FileMetadata::new( + &location_path, + &MaterializedPath::from((location_id, &file_path.materialized_path)), + ) + .await?; if let Some(old_cas_id) = &file_path.cas_id { if old_cas_id != &cas_id { @@ -255,18 +344,17 @@ async fn inner_update_file( .db .file_path() .update( - file_path::location_id_id(location.id, file_path.id), + file_path::location_id_id(location_id, file_path.id), vec![ file_path::cas_id::set(Some(old_cas_id.clone())), - // file_path::size_in_bytes::set(fs_metadata.len().to_string()), - // file_path::kind::set(kind.int_value()), + file_path::size_in_bytes::set(fs_metadata.len().to_string()), file_path::date_modified::set( DateTime::::from(fs_metadata.created().unwrap()).into(), ), file_path::integrity_checksum::set( if file_path.integrity_checksum.is_some() { // If a checksum was already computed, we need to recompute it - Some(file_checksum(&event.paths[0]).await?) + Some(file_checksum(full_path).await?) } else { None }, @@ -276,105 +364,149 @@ async fn inner_update_file( .exec() .await?; - if file_path - .object - .as_ref() - .map(|o| o.has_thumbnail) - .unwrap_or_default() - { + if let Some(ref object) = file_path.object { // if this file had a thumbnail previously, we update it to match the new content - if !file_path.extension.is_empty() { - generate_thumbnail(&file_path.extension, &cas_id, &event.paths[0], library) - .await; + if object.has_thumbnail && !file_path.extension.is_empty() { + generate_thumbnail(&file_path.extension, &cas_id, full_path, library).await; + } + + let int_kind = kind.int_value(); + if object.kind != int_kind { + library + .db + .object() + .update( + object::id::equals(object.id), + vec![object::kind::set(int_kind)], + ) + .exec() + .await?; } } + invalidate_query!(library, "locations.getExplorerData"); } } - invalidate_query!(library, "locations.getExplorerData"); - Ok(()) } -pub(super) async fn rename_both_event( - location: &location_with_indexer_rules::Data, - event: &Event, - library: &Library, -) -> Result<(), LocationManagerError> { - rename(&event.paths[1], &event.paths[0], location, library).await -} - pub(super) async fn rename( + location_id: LocationId, new_path: impl AsRef, old_path: impl AsRef, - location: &location_with_indexer_rules::Data, library: &Library, ) -> Result<(), LocationManagerError> { - let mut old_path_materialized = - extract_materialized_path(location.id, &location.path, old_path.as_ref())? + let location_path = extract_location_path(location_id, library).await?; + + let old_path_materialized = + extract_materialized_path(location_id, &location_path, old_path.as_ref())?; + let mut old_path_materialized_str = format!( + "{MAIN_SEPARATOR_STR}{}", + old_path_materialized .to_str() .expect("Found non-UTF-8 path") - .to_string(); + ); let new_path_materialized = - extract_materialized_path(location.id, &location.path, new_path.as_ref())?; - let mut new_path_materialized_str = new_path_materialized - .to_str() - .expect("Found non-UTF-8 path") - .to_string(); + extract_materialized_path(location_id, &location_path, new_path.as_ref())?; + let mut new_path_materialized_str = format!( + "{MAIN_SEPARATOR_STR}{}", + new_path_materialized + .to_str() + .expect("Found non-UTF-8 path") + ); - if let Some(file_path) = get_existing_file_or_directory(location, old_path, &library.db).await? + let old_materialized_path_parent = old_path_materialized + .parent() + .unwrap_or_else(|| Path::new(MAIN_SEPARATOR_STR)); + let new_materialized_path_parent = new_path_materialized + .parent() + .unwrap_or_else(|| Path::new(MAIN_SEPARATOR_STR)); + + // Renaming a file could potentially be a move to another directory, so we check if our parent changed + let changed_parent_id = if old_materialized_path_parent != new_materialized_path_parent { + Some( + get_parent_dir_id( + &MaterializedPath::new( + location_id, + &location_path, + new_path, + true, + )?, + &library.db, + ) + .await? + .expect("CRITICAL ERROR: If we're puting a file in a directory inside our location, then this directory must exist"), + ) + } else { + None + }; + + if let Some(file_path) = library + .db + .file_path() + .find_first(loose_find_existing_file_path_params( + &MaterializedPath::new(location_id, &location_path, old_path, true)?, + )) + .exec() + .await? { // If the renamed path is a directory, we have to update every successor if file_path.is_dir { - if !old_path_materialized.ends_with('/') { - old_path_materialized += "/"; + if !old_path_materialized_str.ends_with(MAIN_SEPARATOR) { + old_path_materialized_str += MAIN_SEPARATOR_STR; } - if !new_path_materialized_str.ends_with('/') { - new_path_materialized_str += "/"; + if !new_path_materialized_str.ends_with(MAIN_SEPARATOR) { + new_path_materialized_str += MAIN_SEPARATOR_STR; } let updated = library .db - ._execute_raw( - raw!( - "UPDATE file_path SET materialized_path = REPLACE(materialized_path, {}, {}) WHERE location_id = {}", - PrismaValue::String(old_path_materialized), - PrismaValue::String(new_path_materialized_str.clone()), - PrismaValue::Int(location.id as i64) - ) - ) + ._execute_raw(raw!( + "UPDATE file_path \ + SET materialized_path = REPLACE(materialized_path, {}, {}) \ + WHERE location_id = {}", + PrismaValue::String(old_path_materialized_str.clone()), + PrismaValue::String(new_path_materialized_str.clone()), + PrismaValue::Int(location_id as i64) + )) .exec() .await?; trace!("Updated {updated} file_paths"); } + let mut update_params = vec![ + file_path::materialized_path::set(new_path_materialized_str), + file_path::name::set( + new_path_materialized + .file_stem() + .unwrap() + .to_str() + .expect("Found non-UTF-8 path") + .to_string(), + ), + file_path::extension::set( + new_path_materialized + .extension() + .map(|s| { + s.to_str() + .expect("Found non-UTF-8 extension in path") + .to_string() + }) + .unwrap_or_default(), + ), + ]; + + if changed_parent_id.is_some() { + update_params.push(file_path::parent_id::set(changed_parent_id)); + } + library .db .file_path() .update( file_path::location_id_id(file_path.location_id, file_path.id), - vec![ - file_path::materialized_path::set(new_path_materialized_str), - file_path::name::set( - new_path_materialized - .file_stem() - .unwrap() - .to_str() - .expect("Found non-UTF-8 path") - .to_string(), - ), - file_path::extension::set( - new_path_materialized - .extension() - .map(|s| { - s.to_str() - .expect("Found non-UTF-8 extension in path") - .to_string() - }) - .unwrap_or_default(), - ), - ], + update_params, ) .exec() .await?; @@ -384,66 +516,91 @@ pub(super) async fn rename( Ok(()) } -pub(super) async fn remove_event( - location: &location_with_indexer_rules::Data, - event: &Event, - remove_kind: RemoveKind, +pub(super) async fn remove( + location_id: LocationId, + full_path: impl AsRef, library: &Library, ) -> Result<(), LocationManagerError> { - trace!("removed {remove_kind:#?}"); + let full_path = full_path.as_ref(); + let location_path = extract_location_path(location_id, library).await?; - // if it doesn't either way, then we don't care - if let Some(file_path) = - get_existing_file_or_directory(location, &event.paths[0], &library.db).await? - { - // check file still exists on disk - match fs::metadata(&event.paths[0]).await { - Ok(_) => { - todo!("file has changed in some way, re-identify it") - } - Err(e) if e.kind() == ErrorKind::NotFound => { - // if is doesn't, we can remove it safely from our db - if file_path.is_dir { - delete_directory(library, location.id, Some(file_path.materialized_path)) - .await?; - } else { + // if it doesn't exist either way, then we don't care + let Some(file_path) = library.db + .file_path() + .find_first(loose_find_existing_file_path_params( + &MaterializedPath::new(location_id, &location_path, full_path, true)?, + )) + .exec() + .await? else { + return Ok(()); + }; + + remove_by_file_path(location_id, full_path, &file_path, library).await +} + +pub(super) async fn remove_by_file_path( + location_id: LocationId, + path: impl AsRef, + file_path: &file_path::Data, + library: &Library, +) -> Result<(), LocationManagerError> { + // check file still exists on disk + match fs::metadata(path).await { + Ok(_) => { + todo!("file has changed in some way, re-identify it") + } + Err(e) if e.kind() == ErrorKind::NotFound => { + // if is doesn't, we can remove it safely from our db + if file_path.is_dir { + delete_directory( + library, + location_id, + Some(file_path.materialized_path.clone()), + ) + .await?; + } else { + library + .db + .file_path() + .delete(file_path::location_id_id(location_id, file_path.id)) + .exec() + .await?; + + if let Some(object_id) = file_path.object_id { library .db - .file_path() - .delete(file_path::location_id_id(location.id, file_path.id)) + .object() + .delete_many(vec![ + object::id::equals(object_id), + // https://www.prisma.io/docs/reference/api-reference/prisma-client-reference#none + object::file_paths::none(vec![]), + ]) .exec() .await?; - - if let Some(object_id) = file_path.object_id { - library - .db - .object() - .delete_many(vec![ - object::id::equals(object_id), - // https://www.prisma.io/docs/reference/api-reference/prisma-client-reference#none - object::file_paths::none(vec![]), - ]) - .exec() - .await?; - } } } - Err(e) => return Err(e.into()), } - - invalidate_query!(library, "locations.getExplorerData"); + Err(e) => return Err(e.into()), } + // If the file paths we just removed were the last ids in the DB, we decresed the last id from the id manager + library + .last_file_path_id_manager + .sync(location_id, &library.db) + .await?; + + invalidate_query!(library, "locations.getExplorerData"); + Ok(()) } async fn generate_thumbnail( extension: &str, cas_id: &str, - file_path: impl AsRef, + path: impl AsRef, library: &Library, ) { - let file_path = file_path.as_ref(); + let path = path.as_ref(); let output_path = library .config() .data_directory() @@ -453,7 +610,7 @@ async fn generate_thumbnail( if let Ok(extension) = ImageExtension::from_str(extension) { if can_generate_thumbnail_for_image(&extension) { - if let Err(e) = generate_image_thumbnail(file_path, &output_path).await { + if let Err(e) = generate_image_thumbnail(path, &output_path).await { error!("Failed to image thumbnail on location manager: {e:#?}"); } } @@ -466,10 +623,55 @@ async fn generate_thumbnail( if let Ok(extension) = VideoExtension::from_str(extension) { if can_generate_thumbnail_for_video(&extension) { - if let Err(e) = generate_video_thumbnail(file_path, &output_path).await { + if let Err(e) = generate_video_thumbnail(path, &output_path).await { error!("Failed to video thumbnail on location manager: {e:#?}"); } } } } } + +pub(super) async fn extract_inode_and_device_from_path( + location_id: LocationId, + path: impl AsRef, + library: &Library, +) -> Result { + let path = path.as_ref(); + let location = find_location(library, location_id) + .select(location::select!({ path })) + .exec() + .await? + .ok_or(LocationManagerError::MissingLocation(location_id))?; + + library + .db + .file_path() + .find_first(loose_find_existing_file_path_params( + &MaterializedPath::new(location_id, &location.path, path, true)?, + )) + .select(file_path::select!( {inode device} )) + .exec() + .await? + .map(|file_path| { + ( + u64::from_le_bytes(file_path.inode[0..8].try_into().unwrap()), + u64::from_le_bytes(file_path.device[0..8].try_into().unwrap()), + ) + }) + .ok_or_else(|| FilePathError::NotFound(path.to_path_buf()).into()) +} + +pub(super) async fn extract_location_path( + location_id: LocationId, + library: &Library, +) -> Result { + find_location(library, location_id) + .select(location::select!({ path })) + .exec() + .await? + .map_or( + Err(LocationManagerError::MissingLocation(location_id)), + // NOTE: The following usage of `PathBuf` doesn't incur a new allocation so it's fine + |location| Ok(PathBuf::from(location.path)), + ) +} diff --git a/core/src/location/manager/watcher/windows.rs b/core/src/location/manager/watcher/windows.rs index 7c5c7c069..a239a2880 100644 --- a/core/src/location/manager/watcher/windows.rs +++ b/core/src/location/manager/watcher/windows.rs @@ -1,6 +1,23 @@ +//! Windows file system event handler implementation has some caveats die to how +//! file system events are emitted on Windows. +//! +//! For example: When a file is moved to another +//! directory, we receive a remove event and then a create event, so to avoid having to actually +//! remove and create the `file_path` in the database, we have to wait some time after receiving +//! a remove event to see if a create event is emitted. If it is, we just update the `file_path` +//! in the database. If not, we remove the file from the database. + use crate::{ + invalidate_query, library::Library, - location::{location_with_indexer_rules, manager::LocationManagerError}, + location::{ + file_path_helper::get_inode_and_device_from_path, manager::LocationManagerError, LocationId, + }, +}; + +use std::{ + collections::{BTreeMap, HashMap}, + path::PathBuf, }; use async_trait::async_trait; @@ -8,70 +25,126 @@ use notify::{ event::{CreateKind, ModifyKind, RenameMode}, Event, EventKind, }; -use tokio::fs; -use tracing::{trace, warn}; +use tokio::{fs, time::Instant}; +use tracing::{error, trace}; use super::{ - utils::{create_dir, create_file, remove_event, rename, update_file}, - EventHandler, + utils::{create_dir_or_file, extract_inode_and_device_from_path, remove, rename, update_file}, + EventHandler, INodeAndDevice, InstantAndPath, HUNDRED_MILLIS, ONE_SECOND, }; -#[derive(Debug, Default)] -pub(super) struct WindowsEventHandler { - rename_stack: Option, - create_file_stack: Option, +/// Windows file system event handler +#[derive(Debug)] +pub(super) struct WindowsEventHandler<'lib> { + location_id: LocationId, + library: &'lib Library, + last_check_recently_files: Instant, + recently_created_files: BTreeMap, + last_check_rename_and_remove: Instant, + rename_from_map: BTreeMap, + rename_to_map: BTreeMap, + to_remove_files: HashMap, + removal_buffer: Vec<(INodeAndDevice, InstantAndPath)>, } #[async_trait] -impl EventHandler for WindowsEventHandler { - fn new() -> Self +impl<'lib> EventHandler<'lib> for WindowsEventHandler<'lib> { + fn new(location_id: LocationId, library: &'lib Library) -> Self where Self: Sized, { - Default::default() + Self { + location_id, + library, + last_check_recently_files: Instant::now(), + recently_created_files: BTreeMap::new(), + last_check_rename_and_remove: Instant::now(), + rename_from_map: BTreeMap::new(), + rename_to_map: BTreeMap::new(), + to_remove_files: HashMap::new(), + removal_buffer: Vec::new(), + } } - async fn handle_event( - &mut self, - location: location_with_indexer_rules::Data, - library: &Library, - event: Event, - ) -> Result<(), LocationManagerError> { + async fn handle_event(&mut self, event: Event) -> Result<(), LocationManagerError> { trace!("Received Windows event: {:#?}", event); + let Event { + kind, mut paths, .. + } = event; - match event.kind { + match kind { EventKind::Create(CreateKind::Any) => { - let metadata = fs::metadata(&event.paths[0]).await?; - if metadata.is_file() { - self.create_file_stack = Some(event); + let inode_and_device = get_inode_and_device_from_path(&paths[0]).await?; + + if let Some((_, old_path)) = self.to_remove_files.remove(&inode_and_device) { + // if previously we added a file to be removed with the same inode and device + // of this "newly created" created file, it means that the file was just moved to another location + // so we can treat if just as a file rename, like in other OSes + + trace!( + "Got a rename instead of remove/create: {} -> {}", + old_path.display(), + paths[0].display(), + ); + + // We found a new path for this old path, so we can rename it instead of removing and creating it + rename(self.location_id, &paths[0], &old_path, self.library).await?; } else { - create_dir(&location, &event, library).await?; + let metadata = + create_dir_or_file(self.location_id, &paths[0], self.library).await?; + + if metadata.is_file() { + self.recently_created_files + .insert(paths.remove(0), Instant::now()); + } } } EventKind::Modify(ModifyKind::Any) => { - let metadata = fs::metadata(&event.paths[0]).await?; - if metadata.is_file() { - if let Some(create_file_event) = self.create_file_stack.take() { - create_file(&location, &create_file_event, library).await?; - } else { - update_file(&location, &event, library).await?; + // Windows emite events of update right after create events + if !self.recently_created_files.contains_key(&paths[0]) { + let metadata = fs::metadata(&paths[0]).await?; + if metadata.is_file() { + update_file(self.location_id, &paths[0], self.library).await?; } - } else { - warn!("Unexpected Windows modify event on a directory"); } } EventKind::Modify(ModifyKind::Name(RenameMode::From)) => { - self.rename_stack = Some(event); + let path = paths.remove(0); + + let inode_and_device = + extract_inode_and_device_from_path(self.location_id, &path, self.library) + .await?; + + if let Some((_, new_path)) = self.rename_to_map.remove(&inode_and_device) { + // We found a new path for this old path, so we can rename it + rename(self.location_id, &new_path, &path, self.library).await?; + } else { + self.rename_from_map + .insert(inode_and_device, (Instant::now(), path)); + } } EventKind::Modify(ModifyKind::Name(RenameMode::To)) => { - let from_event = self - .rename_stack - .take() - .expect("Unexpectedly missing rename from windows event"); - rename(&event.paths[0], &from_event.paths[0], &location, library).await?; + let path = paths.remove(0); + + let inode_and_device = + extract_inode_and_device_from_path(self.location_id, &path, self.library) + .await?; + + if let Some((_, old_path)) = self.rename_to_map.remove(&inode_and_device) { + // We found a old path for this new path, so we can rename it + rename(self.location_id, &path, &old_path, self.library).await?; + } else { + self.rename_from_map + .insert(inode_and_device, (Instant::now(), path)); + } } - EventKind::Remove(remove_kind) => { - remove_event(&location, &event, remove_kind, library).await?; + EventKind::Remove(_) => { + let path = paths.remove(0); + self.to_remove_files.insert( + extract_inode_and_device_from_path(self.location_id, &path, self.library) + .await?, + (Instant::now(), path), + ); } other_event_kind => { @@ -81,4 +154,56 @@ impl EventHandler for WindowsEventHandler { Ok(()) } + + async fn tick(&mut self) { + // Cleaning out recently created files that are older than 1 second + if self.last_check_recently_files.elapsed() > ONE_SECOND { + self.last_check_recently_files = Instant::now(); + self.recently_created_files + .retain(|_, created_at| created_at.elapsed() < ONE_SECOND); + } + + if self.last_check_rename_and_remove.elapsed() > HUNDRED_MILLIS { + self.last_check_rename_and_remove = Instant::now(); + self.rename_from_map.retain(|_, (created_at, path)| { + let to_retain = created_at.elapsed() < HUNDRED_MILLIS; + if !to_retain { + trace!("Removing from rename from map: {:#?}", path.display()) + } + to_retain + }); + self.rename_to_map.retain(|_, (created_at, path)| { + let to_retain = created_at.elapsed() < HUNDRED_MILLIS; + if !to_retain { + trace!("Removing from rename to map: {:#?}", path.display()) + } + to_retain + }); + self.handle_removes_eviction().await; + } + } +} + +impl WindowsEventHandler<'_> { + async fn handle_removes_eviction(&mut self) { + self.removal_buffer.clear(); + + for (inode_and_device, (instant, path)) in self.to_remove_files.drain() { + if instant.elapsed() > HUNDRED_MILLIS { + if let Err(e) = remove(self.location_id, &path, self.library).await { + error!("Failed to remove file_path: {e}"); + } else { + trace!("Removed file_path due timeout: {}", path.display()); + invalidate_query!(self.library, "locations.getExplorerData"); + } + } else { + self.removal_buffer + .push((inode_and_device, (instant, path))); + } + } + + for (key, value) in self.removal_buffer.drain(..) { + self.to_remove_files.insert(key, value); + } + } } diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index c2e426eb1..ef00b822f 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -359,15 +359,15 @@ pub async fn scan_location( Ok(()) } -#[allow(dead_code)] +#[cfg(feature = "location-watcher")] pub async fn scan_location_sub_path( library: &Library, location: location_with_indexer_rules::Data, sub_path: impl AsRef, -) -> Result<(), LocationError> { +) { let sub_path = sub_path.as_ref().to_path_buf(); if location.node_id != library.node_local_id { - return Ok(()); + return; } library @@ -400,8 +400,6 @@ pub async fn scan_location_sub_path( IndexerJob {}, )) .await; - - Ok(()) } pub async fn light_scan_location( diff --git a/core/src/object/cas.rs b/core/src/object/cas.rs index c48423dce..007b842e3 100644 --- a/core/src/object/cas.rs +++ b/core/src/object/cas.rs @@ -1,44 +1,59 @@ -use blake3::Hasher; use std::path::Path; + +use blake3::Hasher; +use static_assertions::const_assert; use tokio::{ - fs::File, + fs::{self, File}, io::{self, AsyncReadExt, AsyncSeekExt, SeekFrom}, }; -static SAMPLE_COUNT: u64 = 4; -static SAMPLE_SIZE: u64 = 10000; +const SAMPLE_COUNT: u64 = 4; +const SAMPLE_SIZE: u64 = 1024 * 10; +const HEADER_OR_FOOTER_SIZE: u64 = 1024 * 8; -async fn read_at(file: &mut File, offset: u64, size: u64) -> Result, io::Error> { - let mut buf = vec![0u8; size as usize]; +// minimum file size of 100KiB, to avoid sample hashing for small files as they can be smaller than the total sample size +const MINIMUM_FILE_SIZE: u64 = 1024 * 100; - file.seek(SeekFrom::Start(offset)).await?; - file.read_exact(&mut buf).await?; - - Ok(buf) -} +// Asserting that nobody messed up our consts +const_assert!( + HEADER_OR_FOOTER_SIZE + SAMPLE_COUNT * SAMPLE_SIZE + HEADER_OR_FOOTER_SIZE < MINIMUM_FILE_SIZE +); pub async fn generate_cas_id(path: impl AsRef, size: u64) -> Result { - let mut file = File::open(path).await?; let mut hasher = Hasher::new(); hasher.update(&size.to_le_bytes()); - let sample_interval = if SAMPLE_COUNT * SAMPLE_SIZE > size { - size + if size <= MINIMUM_FILE_SIZE { + // For small files, we hash the whole file + fs::read(path).await.map(|buf| { + hasher.update(&buf); + })?; } else { - size / SAMPLE_COUNT - }; + let mut file = File::open(path).await?; + let mut buf = vec![0; SAMPLE_SIZE as usize].into_boxed_slice(); - for i in 0..=SAMPLE_COUNT { - let offset = if i == SAMPLE_COUNT { - size - SAMPLE_SIZE - } else { - sample_interval * i - }; - let buf = read_at(&mut file, offset, SAMPLE_SIZE).await?; + // Hashing the header + file.read_exact(&mut buf[..HEADER_OR_FOOTER_SIZE as usize]) + .await?; + hasher.update(&buf); + + // Sample hashing the inner content of the file + for _ in 0..SAMPLE_COUNT { + file.seek(SeekFrom::Current( + ((size - HEADER_OR_FOOTER_SIZE * 2) / SAMPLE_COUNT) as i64, + )) + .await?; + file.read_exact(&mut buf).await?; + hasher.update(&buf); + } + + // Hashing the footer + file.seek(SeekFrom::End(-(HEADER_OR_FOOTER_SIZE as i64))) + .await?; + file.read_exact(&mut buf[..HEADER_OR_FOOTER_SIZE as usize]) + .await?; hasher.update(&buf); } - let mut id = hasher.finalize().to_hex(); - id.truncate(16); - Ok(id.to_string()) + Ok(hasher.finalize().to_hex()[..16].to_string()) } diff --git a/core/src/object/file_identifier/file_identifier_job.rs b/core/src/object/file_identifier/file_identifier_job.rs index 66766e626..643f2e2de 100644 --- a/core/src/object/file_identifier/file_identifier_job.rs +++ b/core/src/object/file_identifier/file_identifier_job.rs @@ -50,7 +50,7 @@ impl Hash for FileIdentifierJobInit { pub struct FileIdentifierJobState { cursor: FilePathIdAndLocationIdCursor, report: FileIdentifierReport, - maybe_sub_materialized_path: Option, + maybe_sub_materialized_path: Option>, } #[async_trait::async_trait] @@ -192,7 +192,7 @@ impl StatefulJob for FileIdentifierJob { fn orphan_path_filters( location_id: i32, file_path_id: Option, - maybe_sub_materialized_path: &Option, + maybe_sub_materialized_path: &Option>, ) -> Vec { let mut params = vec![ file_path::object_id::equals(None), @@ -216,7 +216,7 @@ fn orphan_path_filters( async fn count_orphan_file_paths( db: &PrismaClient, location_id: i32, - maybe_sub_materialized_path: &Option, + maybe_sub_materialized_path: &Option>, ) -> Result { db.file_path() .count(orphan_path_filters( @@ -232,7 +232,7 @@ async fn count_orphan_file_paths( async fn get_orphan_file_paths( db: &PrismaClient, cursor: &FilePathIdAndLocationIdCursor, - maybe_sub_materialized_path: &Option, + maybe_sub_materialized_path: &Option>, ) -> Result, prisma_client_rust::QueryError> { info!( "Querying {} orphan Paths at cursor: {:?}", diff --git a/core/src/object/file_identifier/mod.rs b/core/src/object/file_identifier/mod.rs index a7956d360..1edd20e6a 100644 --- a/core/src/object/file_identifier/mod.rs +++ b/core/src/object/file_identifier/mod.rs @@ -2,7 +2,7 @@ use crate::{ invalidate_query, job::{JobError, JobReportUpdate, JobResult, WorkerContext}, library::Library, - location::file_path_helper::{file_path_for_file_identifier, FilePathError}, + location::file_path_helper::{file_path_for_file_identifier, FilePathError, MaterializedPath}, object::{cas::generate_cas_id, object_for_file_identifier}, prisma::{file_path, location, object, PrismaClient}, sync, @@ -48,9 +48,9 @@ impl FileMetadata { /// Assembles `create_unchecked` params for a given file path pub async fn new( location_path: impl AsRef, - materialized_path: impl AsRef, // TODO: use dedicated CreateUnchecked type + materialized_path: &MaterializedPath<'_>, // TODO: use dedicated CreateUnchecked type ) -> Result { - let path = location_path.as_ref().join(materialized_path.as_ref()); + let path = location_path.as_ref().join(materialized_path); let fs_metadata = fs::metadata(&path).await?; @@ -67,7 +67,7 @@ impl FileMetadata { let cas_id = generate_cas_id(&path, fs_metadata.len()).await?; - info!("Analyzed file: {:?} {:?} {:?}", path, cas_id, kind); + info!("Analyzed file: {path:?} {cas_id:?} {kind:?}"); Ok(FileMetadata { cas_id, @@ -104,9 +104,13 @@ async fn identifier_job_step( file_paths: &[file_path_for_file_identifier::Data], ) -> Result<(usize, usize), JobError> { let file_path_metas = join_all(file_paths.iter().map(|file_path| async move { - FileMetadata::new(&location.path, &file_path.materialized_path) - .await - .map(|params| (file_path.id, (params, file_path))) + // NOTE: `file_path`'s `materialized_path` begins with a `/` character so we remove it to join it with `location.path` + FileMetadata::new( + &location.path, + &MaterializedPath::from((location.id, &file_path.materialized_path)), + ) + .await + .map(|params| (file_path.id, (params, file_path))) })) .await .into_iter() @@ -256,7 +260,6 @@ async fn identifier_job_step( vec![ object::date_created::set(fp.date_created), object::kind::set(kind), - object::size_in_bytes::set(size), ], ), ); diff --git a/core/src/object/file_identifier/shallow_file_identifier_job.rs b/core/src/object/file_identifier/shallow_file_identifier_job.rs index b68dc0543..607a41ad3 100644 --- a/core/src/object/file_identifier/shallow_file_identifier_job.rs +++ b/core/src/object/file_identifier/shallow_file_identifier_job.rs @@ -77,7 +77,7 @@ impl StatefulJob for ShallowFileIdentifierJob { .map_err(FileIdentifierJobError::from)?; get_existing_file_path_id( - MaterializedPath::new(location_id, location_path, &full_path, true) + &MaterializedPath::new(location_id, location_path, &full_path, true) .map_err(FileIdentifierJobError::from)?, db, ) @@ -86,7 +86,7 @@ impl StatefulJob for ShallowFileIdentifierJob { .expect("Sub path should already exist in the database") } else { get_existing_file_path_id( - MaterializedPath::new(location_id, location_path, location_path, true) + &MaterializedPath::new(location_id, location_path, location_path, true) .map_err(FileIdentifierJobError::from)?, db, ) diff --git a/core/src/object/fs/encrypt.rs b/core/src/object/fs/encrypt.rs index 168992648..88e262cd5 100644 --- a/core/src/object/fs/encrypt.rs +++ b/core/src/object/fs/encrypt.rs @@ -41,7 +41,6 @@ pub struct Metadata { pub important: bool, pub note: Option, pub date_created: chrono::DateTime, - pub date_modified: chrono::DateTime, } const JOB_NAME: &str = "file_encryptor"; @@ -148,7 +147,7 @@ impl StatefulJob for FileEncryptorJob { if state.init.metadata || state.init.preview_media { // if any are requested, we can make the query as it'll be used at least once - if let Some(object) = info.path_data.object.clone() { + if let Some(ref object) = info.path_data.object { if state.init.metadata { let metadata = Metadata { path_id: state.init.path_id, @@ -156,9 +155,8 @@ impl StatefulJob for FileEncryptorJob { hidden: object.hidden, favorite: object.favorite, important: object.important, - note: object.note, + note: object.note.clone(), date_created: object.date_created, - date_modified: object.date_modified, }; header diff --git a/core/src/object/fs/mod.rs b/core/src/object/fs/mod.rs index 073776c52..dceb11ef5 100644 --- a/core/src/object/fs/mod.rs +++ b/core/src/object/fs/mod.rs @@ -1,6 +1,6 @@ use crate::{ job::JobError, - location::file_path_helper::file_path_with_object, + location::file_path_helper::{file_path_with_object, MaterializedPath}, prisma::{file_path, location, PrismaClient}, }; @@ -76,7 +76,10 @@ pub async fn context_menu_fs_info( Ok(FsInfo { fs_path: get_path_from_location_id(db, location_id) .await? - .join(&path_data.materialized_path), + .join(&MaterializedPath::from(( + location_id, + &path_data.materialized_path, + ))), path_data, }) } diff --git a/core/src/object/preview/thumbnail/mod.rs b/core/src/object/preview/thumbnail/mod.rs index f25d0442f..b5796e740 100644 --- a/core/src/object/preview/thumbnail/mod.rs +++ b/core/src/object/preview/thumbnail/mod.rs @@ -3,7 +3,9 @@ use crate::{ invalidate_query, job::{JobError, JobReportUpdate, JobResult, WorkerContext}, location::{ - file_path_helper::{file_path_just_materialized_path_cas_id, FilePathError}, + file_path_helper::{ + file_path_just_materialized_path_cas_id, FilePathError, MaterializedPath, + }, LocationId, }, }; @@ -148,7 +150,10 @@ fn finalize_thumbnailer(data: &ThumbnailerJobState, ctx: WorkerContext) -> JobRe "Finished thumbnail generation for location {} at {}", data.report.location_id, data.location_path - .join(&data.report.materialized_path) + .join(&MaterializedPath::from(( + data.report.location_id, + &data.report.materialized_path + ))) .display() ); @@ -185,7 +190,10 @@ async fn inner_process_step( ctx: &WorkerContext, ) -> Result<(), JobError> { // assemble the file path - let path = data.location_path.join(&step.file_path.materialized_path); + let path = data.location_path.join(&MaterializedPath::from(( + data.report.location_id, + &step.file_path.materialized_path, + ))); trace!("image_file {:?}", step); // get cas_id, if none found skip diff --git a/core/src/object/preview/thumbnail/shallow_thumbnailer_job.rs b/core/src/object/preview/thumbnail/shallow_thumbnailer_job.rs index f0ba60326..97b4f99e4 100644 --- a/core/src/object/preview/thumbnail/shallow_thumbnailer_job.rs +++ b/core/src/object/preview/thumbnail/shallow_thumbnailer_job.rs @@ -14,7 +14,7 @@ use crate::{ use std::{ collections::VecDeque, hash::Hash, - path::{Path, PathBuf}, + path::{Path, PathBuf, MAIN_SEPARATOR_STR}, }; use sd_file_ext::extensions::Extension; @@ -80,7 +80,7 @@ impl StatefulJob for ShallowThumbnailerJob { .map_err(ThumbnailerError::from)?; get_existing_file_path_id( - MaterializedPath::new(location_id, &location_path, &full_path, true) + &MaterializedPath::new(location_id, &location_path, &full_path, true) .map_err(ThumbnailerError::from)?, db, ) @@ -89,7 +89,7 @@ impl StatefulJob for ShallowThumbnailerJob { .expect("Sub path should already exist in the database") } else { get_existing_file_path_id( - MaterializedPath::new(location_id, &location_path, &location_path, true) + &MaterializedPath::new(location_id, &location_path, &location_path, true) .map_err(ThumbnailerError::from)?, db, ) @@ -149,7 +149,7 @@ impl StatefulJob for ShallowThumbnailerJob { // SAFETY: We know that the sub_path is a valid UTF-8 string because we validated it before state.init.sub_path.to_str().unwrap().to_string() } else { - "".to_string() + MAIN_SEPARATOR_STR.to_string() }, thumbnails_created: 0, }, diff --git a/core/src/object/preview/thumbnail/thumbnailer_job.rs b/core/src/object/preview/thumbnail/thumbnailer_job.rs index 41f6ebea9..15e3389c4 100644 --- a/core/src/object/preview/thumbnail/thumbnailer_job.rs +++ b/core/src/object/preview/thumbnail/thumbnailer_job.rs @@ -167,7 +167,7 @@ impl StatefulJob for ThumbnailerJob { async fn get_files_by_extensions( db: &PrismaClient, - materialized_path: &MaterializedPath, + materialized_path: &MaterializedPath<'_>, extensions: &[Extension], kind: ThumbnailerJobStepKind, ) -> Result, JobError> { diff --git a/core/src/object/validation/validator_job.rs b/core/src/object/validation/validator_job.rs index 09990b9ae..a9f13863b 100644 --- a/core/src/object/validation/validator_job.rs +++ b/core/src/object/validation/validator_job.rs @@ -1,7 +1,7 @@ use crate::{ job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}, library::Library, - location::file_path_helper::file_path_for_object_validator, + location::file_path_helper::{file_path_for_object_validator, MaterializedPath}, prisma::{file_path, location}, sync, }; @@ -94,7 +94,11 @@ impl StatefulJob for ObjectValidatorJob { // we can also compare old and new checksums here // This if is just to make sure, we already queried objects where integrity_checksum is null if file_path.integrity_checksum.is_none() { - let checksum = file_checksum(data.root_path.join(&file_path.materialized_path)).await?; + let checksum = file_checksum(data.root_path.join(&MaterializedPath::from(( + file_path.location.id, + &file_path.materialized_path, + )))) + .await?; sync.write_op( db, diff --git a/core/src/sync/manager.rs b/core/src/sync/manager.rs index f7b19c95a..15667e8bd 100644 --- a/core/src/sync/manager.rs +++ b/core/src/sync/manager.rs @@ -1,7 +1,10 @@ use crate::prisma::*; -use sd_sync::*; -use serde_json::{from_value, json, to_vec, Value}; + use std::{collections::HashMap, sync::Arc}; + +use sd_sync::*; + +use serde_json::{from_value, json, to_vec, Value}; use tokio::sync::broadcast::{self, Receiver, Sender}; use uhlc::{HLCBuilder, HLC, NTP64}; use uuid::Uuid; @@ -220,6 +223,8 @@ impl SyncManager { }), ) .unwrap(), + serde_json::from_value(data.remove("inode").unwrap()).unwrap(), + serde_json::from_value(data.remove("device").unwrap()).unwrap(), data.into_iter() .flat_map(|(k, v)| file_path::SetParam::deserialize(&k, v)) .collect(), diff --git a/crates/ffmpeg/src/utils.rs b/crates/ffmpeg/src/utils.rs index e97f9745d..2165151a2 100644 --- a/crates/ffmpeg/src/utils.rs +++ b/crates/ffmpeg/src/utils.rs @@ -3,28 +3,27 @@ use std::ffi::CString; use std::path::Path; pub(crate) fn from_path(path: impl AsRef) -> Result { + let path = path.as_ref(); #[cfg(unix)] { use std::os::unix::ffi::OsStrExt; - CString::new(path.as_ref().as_os_str().as_bytes()) - .map_err(|_| ThumbnailerError::PathConversion(path.as_ref().to_path_buf())) + CString::new(path.as_os_str().as_bytes()) + .map_err(|_| ThumbnailerError::PathConversion(path.to_path_buf())) } #[cfg(windows)] { use std::os::windows::ffi::OsStrExt; CString::from_vec_with_nul( - path.as_ref() - .as_os_str() + path.as_os_str() .encode_wide() .chain(Some(0)) - .map(|b| { + .flat_map(|b| { let b = b.to_ne_bytes(); - b.get(0).map(|s| *s).into_iter().chain(b.get(1).map(|s| *s)) + b.first().copied().into_iter().chain(b.get(1).copied()) }) - .flatten() .collect::>(), ) - .map_err(|_| ThumbnailerError::PathConversion(path.as_ref().to_path_buf())) + .map_err(|_| ThumbnailerError::PathConversion(path.to_path_buf())) } } diff --git a/packages/client/src/core.ts b/packages/client/src/core.ts index e40cd0e8e..da9933b0c 100644 --- a/packages/client/src/core.ts +++ b/packages/client/src/core.ts @@ -4,7 +4,7 @@ export type Procedures = { queries: { key: "buildInfo", input: never, result: BuildInfo } | - { key: "files.get", input: LibraryArgs, result: { id: number, pub_id: number[], name: string | null, extension: string | null, kind: number, size_in_bytes: string, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, date_modified: string, date_indexed: string, file_paths: FilePath[], media_data: MediaData | null } | null } | + { key: "files.get", input: LibraryArgs, result: { id: number, pub_id: number[], kind: number, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, file_paths: FilePath[], media_data: MediaData | null } | null } | { key: "jobs.getHistory", input: LibraryArgs, result: JobReport[] } | { key: "jobs.getRunning", input: LibraryArgs, result: JobReport[] } | { key: "jobs.isRunning", input: LibraryArgs, result: boolean } | @@ -79,7 +79,7 @@ export type Procedures = { subscriptions: { key: "invalidateQuery", input: never, result: InvalidateOperationEvent } | { key: "jobs.newThumbnail", input: LibraryArgs, result: string } | - { key: "locations.online", input: never, result: number[][] } | + { key: "locations.online", input: never, result: string[] } | { key: "p2p.events", input: never, result: P2PEvent } | { key: "sync.newMessage", input: LibraryArgs, result: CRDTOperation } }; @@ -134,7 +134,7 @@ export type FileEncryptorJobInit = { location_id: number, path_id: number, key_u export type FileEraserJobInit = { location_id: number, path_id: number, passes: string } -export type FilePath = { id: number, is_dir: boolean, cas_id: string | null, integrity_checksum: string | null, location_id: number, materialized_path: string, name: string, extension: string, object_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string } +export type FilePath = { id: number, is_dir: boolean, cas_id: string | null, integrity_checksum: string | null, location_id: number, materialized_path: string, name: string, extension: string, size_in_bytes: string, inode: number[], device: number[], object_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string } export type GenerateThumbsForLocationArgs = { id: number, path: string } @@ -224,7 +224,7 @@ export type NodeState = (({ version: string | null }) & { id: string, name: stri */ export type Nonce = { XChaCha20Poly1305: number[] } | { Aes256Gcm: number[] } -export type Object = { id: number, pub_id: number[], name: string | null, extension: string | null, kind: number, size_in_bytes: string, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, date_modified: string, date_indexed: string } +export type Object = { id: number, pub_id: number[], kind: number, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string } export type ObjectValidatorArgs = { id: number, path: string } @@ -316,8 +316,8 @@ export type UnlockKeyManagerArgs = { password: string, secret_key: string } export type Volume = { name: string, mount_point: string, total_capacity: string, available_capacity: string, is_removable: boolean, disk_type: string | null, file_system: string | null, is_root_filesystem: boolean } -export type file_path_with_object = { id: number, is_dir: boolean, cas_id: string | null, integrity_checksum: string | null, location_id: number, materialized_path: string, name: string, extension: string, object_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string, object: Object | null } +export type file_path_with_object = { id: number, is_dir: boolean, cas_id: string | null, integrity_checksum: string | null, location_id: number, materialized_path: string, name: string, extension: string, size_in_bytes: string, inode: number[], device: number[], object_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string, object: Object | null } export type location_with_indexer_rules = { id: number, pub_id: number[], node_id: number, name: string, path: string, total_capacity: number | null, available_capacity: number | null, is_archived: boolean, generate_preview_media: boolean, sync_preview_media: boolean, hidden: boolean, date_created: string, indexer_rules: { indexer_rule: IndexerRule }[] } -export type object_with_file_paths = { id: number, pub_id: number[], name: string | null, extension: string | null, kind: number, size_in_bytes: string, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, date_modified: string, date_indexed: string, file_paths: FilePath[] } +export type object_with_file_paths = { id: number, pub_id: number[], kind: number, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, file_paths: FilePath[] }