Eng 372 location awareness improvements (#612)

* Complying with a pedantic Clippy

* Solving duplicated directories events
When creating a directory through MacOS's Finder, for some reason
FSEvents receives 2 Create Folder events that we have to handle

* Handling moving to trash bin and restoring on Mac
Still missing the feature to restore a directory and its children

* Now handling creation of empty files on MacOS

* Enabling restore of directories and its children

* Now working: moving a directory to another
inside the same location

* Now Indexer also remove file_paths not on fs

* Enabling multiple file moves on location watcher

* Fix Windows conditional compilation issues

* Fixing cas_id generation and bumping some deps

* Many Windows specific improvs and some refactors

* Rust fmt

* Using conditional compilation on extract inode function

* Linux fixes and some MaterializedPath improvements

* Rust fmt again

* Introducing tick behavior on location watchers

* Making LastFilePathIdManager more atomic

* Some vscode launch configs for lldb debugger

* Simplifying some lifetimes

* Making all watchers more consistent
This commit is contained in:
Ericson "Fogo" Soares 2023-03-31 01:59:33 -03:00 committed by GitHub
parent 7d996a10cc
commit b346e7ac52
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 1841 additions and 770 deletions

80
.vscode/launch.json vendored Normal file
View file

@ -0,0 +1,80 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "lldb",
"request": "launch",
"name": "Tauri Development Debug",
"cargo": {
"args": [
"build",
"--manifest-path=./apps/desktop/src-tauri/Cargo.toml",
"--no-default-features"
],
"problemMatcher": "$rustc",
},
"sourceLanguages": [
"rust"
],
"preLaunchTask": "ui:dev"
},
{
"type": "lldb",
"request": "launch",
"name": "Tauri Production Debug",
"cargo": {
"args": [
"build",
"--release",
"--manifest-path=./apps/desktop/src-tauri/Cargo.toml"
],
"problemMatcher": "$rustc",
},
"sourceLanguages": [
"rust"
],
"preLaunchTask": "ui:build"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in library 'sd-core'",
"cargo": {
"args": [
"test",
"--no-run",
"--lib",
"--package=sd-core"
],
"filter": {
"name": "sd-core",
"kind": "lib"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in library 'sd-crypto'",
"cargo": {
"args": [
"test",
"--no-run",
"--lib",
"--package=sd-crypto"
],
"filter": {
"name": "sd-crypto",
"kind": "lib"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
]
}

59
.vscode/tasks.json vendored
View file

@ -4,20 +4,28 @@
{
"type": "cargo",
"command": "clippy",
"problemMatcher": ["$rustc"],
"problemMatcher": [
"$rustc"
],
"group": {
"kind": "build",
"isDefault": true
},
"label": "rust: cargo clippy",
"args": ["--all-targets", "--all-features", "--all"]
"args": [
"--all-targets",
"--all-features",
"--all"
]
},
{
"type": "npm",
"script": "prep",
"label": "pnpm: prep",
"group": "none",
"problemMatcher": ["$rustc"]
"problemMatcher": [
"$rustc"
]
},
{
"type": "shell",
@ -32,7 +40,12 @@
},
"isBackground": true,
"command": "pnpm",
"args": ["desktop", "vite", "--clearScreen=false", "--mode=development"],
"args": [
"desktop",
"vite",
"--clearScreen=false",
"--mode=development"
],
"runOptions": {
"instanceLimit": 1
}
@ -42,31 +55,49 @@
"label": "ui:build",
"problemMatcher": "$tsc",
"command": "pnpm",
"args": ["desktop", "vite", "build"]
"args": [
"desktop",
"vite",
"build"
]
},
{
"type": "cargo",
"command": "run",
"args": ["--manifest-path=./apps/desktop/src-tauri/Cargo.toml", "--no-default-features"],
"args": [
"--manifest-path=./apps/desktop/src-tauri/Cargo.toml",
"--no-default-features"
],
"env": {
"RUST_BACKTRACE": "short" // Change this if you want more or less backtrace
"RUST_BACKTRACE": "short"
},
"problemMatcher": ["$rustc"],
"problemMatcher": [
"$rustc"
],
"group": "build",
"label": "rust: run spacedrive",
"dependsOn": ["ui:dev"]
"dependsOn": [
"ui:dev"
]
},
{
"type": "cargo",
"command": "run",
"args": ["--manifest-path=./apps/desktop/src-tauri/Cargo.toml", "--release"],
"args": [
"--manifest-path=./apps/desktop/src-tauri/Cargo.toml",
"--release"
],
"env": {
"RUST_BACKTRACE": "short" // Change this if you want more or less backtrace
"RUST_BACKTRACE": "short"
},
"problemMatcher": ["$rustc"],
"problemMatcher": [
"$rustc"
],
"group": "build",
"label": "rust: run spacedrive release",
"dependsOn": ["ui:build"]
"dependsOn": [
"ui:build"
]
}
]
}
}

8
Cargo.lock generated
View file

@ -6626,7 +6626,7 @@ version = "0.1.0"
dependencies = [
"async-stream",
"async-trait",
"base64 0.13.1",
"base64 0.21.0",
"blake3",
"chrono",
"ctor",
@ -6658,6 +6658,7 @@ dependencies = [
"serde_json",
"serde_with 2.2.0",
"specta",
"static_assertions",
"sysinfo",
"tempfile",
"thiserror",
@ -6668,6 +6669,7 @@ dependencies = [
"uhlc",
"uuid 1.2.1",
"webp",
"winapi-util",
]
[[package]]
@ -7673,9 +7675,9 @@ dependencies = [
[[package]]
name = "sysinfo"
version = "0.26.4"
version = "0.28.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7890fff842b8db56f2033ebee8f6efe1921475c3830c115995552914fb967580"
checksum = "f69e0d827cce279e61c2f3399eb789271a8f136d8245edef70f06e3c9601a670"
dependencies = [
"cfg-if",
"core-foundation-sys",

View file

@ -6,7 +6,7 @@ authors = ["Spacedrive Technology Inc."]
license = "GNU GENERAL PUBLIC LICENSE"
repository = "https://github.com/spacedriveapp/spacedrive"
edition = "2021"
rust-version = "1.67.0"
rust-version = "1.68.1"
[features]
default = []
@ -42,7 +42,7 @@ tokio = { workspace = true, features = [
"time",
] }
base64 = "0.13.0"
base64 = "0.21.0"
serde = { version = "1.0", features = ["derive"] }
chrono = { version = "0.4.22", features = ["serde"] }
serde_json = "1.0"
@ -53,7 +53,7 @@ rmp-serde = "^1.1.1"
blake3 = "1.3.1"
hostname = "0.3.1"
uuid = { version = "1.1.2", features = ["v4", "serde"] }
sysinfo = "0.26.4"
sysinfo = "0.28.3"
thiserror = "1.0.37"
include_dir = { version = "0.7.2", features = ["glob"] }
async-trait = "^0.1.57"
@ -76,6 +76,10 @@ ffmpeg-next = { version = "5.1.1", optional = true, features = [] }
notify = { version = "5.0.0", default-features = false, features = [
"macos_fsevent",
], optional = true }
static_assertions = "1.1.0"
[target.'cfg(windows)'.dependencies.winapi-util]
version = "0.1.5"
[dev-dependencies]
tempfile = "^3.3.0"

View file

@ -64,8 +64,8 @@ CREATE TABLE "location" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"pub_id" BLOB NOT NULL,
"node_id" INTEGER NOT NULL,
"name" TEXT,
"local_path" TEXT,
"name" TEXT NOT NULL,
"path" TEXT NOT NULL,
"total_capacity" INTEGER,
"available_capacity" INTEGER,
"is_archived" BOOLEAN NOT NULL DEFAULT false,
@ -80,10 +80,7 @@ CREATE TABLE "location" (
CREATE TABLE "object" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"pub_id" BLOB NOT NULL,
"name" TEXT,
"extension" TEXT COLLATE NOCASE,
"kind" INTEGER NOT NULL DEFAULT 0,
"size_in_bytes" TEXT NOT NULL DEFAULT '0',
"key_id" INTEGER,
"hidden" BOOLEAN NOT NULL DEFAULT false,
"favorite" BOOLEAN NOT NULL DEFAULT false,
@ -94,8 +91,6 @@ CREATE TABLE "object" (
"ipfs_id" TEXT,
"note" TEXT,
"date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"date_indexed" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "object_key_id_fkey" FOREIGN KEY ("key_id") REFERENCES "key" ("id") ON DELETE SET NULL ON UPDATE CASCADE
);
@ -109,6 +104,9 @@ CREATE TABLE "file_path" (
"materialized_path" TEXT NOT NULL,
"name" TEXT NOT NULL,
"extension" TEXT COLLATE NOCASE NOT NULL,
"size_in_bytes" TEXT NOT NULL DEFAULT '0',
"inode" BLOB NOT NULL,
"device" BLOB NOT NULL,
"object_id" INTEGER,
"parent_id" INTEGER,
"key_id" INTEGER,
@ -320,6 +318,9 @@ CREATE INDEX "file_path_location_id_idx" ON "file_path"("location_id");
-- CreateIndex
CREATE UNIQUE INDEX "file_path_location_id_materialized_path_name_extension_key" ON "file_path"("location_id", "materialized_path", "name", "extension");
-- CreateIndex
CREATE UNIQUE INDEX "file_path_location_id_inode_device_key" ON "file_path"("location_id", "inode", "device");
-- CreateIndex
CREATE UNIQUE INDEX "file_conflict_original_object_id_key" ON "file_conflict"("original_object_id");

View file

@ -134,6 +134,11 @@ model FilePath {
name String
extension String
size_in_bytes String @default("0")
inode Bytes // This is actually an unsigned 64 bit integer, but we don't have this type in SQLite
device Bytes // This is actually an unsigned 64 bit integer, but we don't have this type in SQLite
// the unique Object for this file path
object_id Int?
object Object? @relation(fields: [object_id], references: [id], onDelete: Restrict)
@ -155,6 +160,7 @@ model FilePath {
@@id([location_id, id])
@@unique([location_id, materialized_path, name, extension])
@@unique([location_id, inode, device])
@@index([location_id])
@@map("file_path")
}
@ -163,12 +169,8 @@ model FilePath {
model Object {
id Int @id @default(autoincrement())
pub_id Bytes @unique
// basic metadata
name String?
// Must have 'COLLATE NOCASE' in migration
extension String?
kind Int @default(0)
size_in_bytes String @default("0")
key_id Int?
// handy ways to mark an object
hidden Boolean @default(false)
@ -188,10 +190,6 @@ model Object {
note String?
// the original known creation date of this object
date_created DateTime @default(now())
// the last time this object was modified
date_modified DateTime @default(now())
// when this object was first indexed
date_indexed DateTime @default(now())
tags TagOnObject[]
labels LabelOnObject[]

View file

@ -8,7 +8,7 @@ use crate::{
prisma::{file_path, indexer_rule, indexer_rules_in_location, location, object, tag},
};
use std::path::PathBuf;
use std::path::{PathBuf, MAIN_SEPARATOR, MAIN_SEPARATOR_STR};
use rspc::{self, ErrorCode, RouterBuilderLike, Type};
use serde::{Deserialize, Serialize};
@ -87,8 +87,8 @@ pub(crate) fn mount() -> impl RouterBuilderLike<Ctx> {
.await?
.ok_or(LocationError::IdNotFound(args.location_id))?;
if !args.path.ends_with('/') {
args.path += "/";
if !args.path.ends_with(MAIN_SEPARATOR) {
args.path += MAIN_SEPARATOR_STR;
}
let directory = db

View file

@ -51,20 +51,7 @@ pub(crate) fn mount() -> RouterBuilder {
let mut items = Vec::with_capacity(objects.len());
for mut object in objects {
// sorry brendan
// grab the first path and tac on the name
let oldest_path = &object.file_paths[0];
object.name = Some(oldest_path.name.clone());
object.extension = if oldest_path.extension.is_empty() {
None
} else {
Some(oldest_path.extension.clone())
};
// a long term fix for this would be to have the indexer give the Object
// a name and extension, sacrificing its own and only store newly found Path
// names that differ from the Object name
for object in objects {
let cas_id = object
.file_paths
.iter()

View file

@ -1,4 +1,4 @@
use crate::{prisma::file_path, Node};
use crate::{location::file_path_helper::MaterializedPath, prisma::file_path, Node};
use std::{
cmp::min,
@ -125,7 +125,10 @@ async fn handle_file(
.ok_or_else(|| HandleCustomUriError::NotFound("object"))?;
let lru_entry = (
Path::new(&file_path.location.path).join(&file_path.materialized_path),
Path::new(&file_path.location.path).join(&MaterializedPath::from((
location_id,
&file_path.materialized_path,
))),
file_path.extension,
);
FILE_METADATA_CACHE.insert(lru_cache_key, lru_entry.clone());

View file

@ -1,5 +1,3 @@
use crate::LocationManagerError;
use std::path::PathBuf;
use rspc::{self, ErrorCode};
@ -7,7 +5,9 @@ use thiserror::Error;
use tokio::io;
use uuid::Uuid;
use super::{file_path_helper::FilePathError, metadata::LocationMetadataError};
use super::{
file_path_helper::FilePathError, manager::LocationManagerError, metadata::LocationMetadataError,
};
/// Error type for location related errors
#[derive(Error, Debug)]

View file

@ -1,11 +1,10 @@
use crate::prisma::{
file_path::{self, FindMany},
location, PrismaClient,
};
use crate::prisma::{file_path, location, PrismaClient};
use std::{
borrow::Cow,
fmt::{Display, Formatter},
path::{Path, PathBuf},
fs::Metadata,
path::{Path, PathBuf, MAIN_SEPARATOR, MAIN_SEPARATOR_STR},
};
use dashmap::{mapref::entry::Entry, DashMap};
@ -47,15 +46,15 @@ file_path::select!(file_path_just_materialized_path_cas_id {
file_path::include!(file_path_with_object { object });
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct MaterializedPath {
pub(super) materialized_path: String,
pub struct MaterializedPath<'a> {
pub(super) materialized_path: Cow<'a, str>,
pub(super) is_dir: bool,
pub(super) location_id: LocationId,
pub(super) name: String,
pub(super) extension: String,
pub(super) name: Cow<'a, str>,
pub(super) extension: Cow<'a, str>,
}
impl MaterializedPath {
impl MaterializedPath<'static> {
pub fn new(
location_id: LocationId,
location_path: impl AsRef<Path>,
@ -63,14 +62,15 @@ impl MaterializedPath {
is_dir: bool,
) -> Result<Self, FilePathError> {
let full_path = full_path.as_ref();
let mut materialized_path =
let mut materialized_path = format!(
"{MAIN_SEPARATOR_STR}{}",
extract_materialized_path(location_id, location_path, full_path)?
.to_str()
.expect("Found non-UTF-8 path")
.to_string();
);
if is_dir && !materialized_path.ends_with('/') {
materialized_path += "/";
if is_dir && !materialized_path.ends_with(MAIN_SEPARATOR) {
materialized_path += MAIN_SEPARATOR_STR;
}
let extension = if !is_dir {
@ -96,78 +96,117 @@ impl MaterializedPath {
};
Ok(Self {
materialized_path,
materialized_path: Cow::Owned(materialized_path),
is_dir,
location_id,
name: Self::prepare_name(full_path),
extension,
name: Cow::Owned(Self::prepare_name(full_path).to_string()),
extension: Cow::Owned(extension),
})
}
}
impl<'a> MaterializedPath<'a> {
pub fn location_id(&self) -> LocationId {
self.location_id
}
fn prepare_name(path: &Path) -> String {
fn prepare_name(path: &Path) -> &str {
// Not using `impl AsRef<Path>` here because it's an private method
path.file_name()
path.file_stem()
.unwrap_or_default()
.to_str()
.unwrap_or_default()
.to_string()
}
pub fn parent(&self) -> Self {
let parent_path = Path::new(&self.materialized_path)
let parent_path = Path::new(self.materialized_path.as_ref())
.parent()
.unwrap_or_else(|| Path::new("/"));
.unwrap_or_else(|| Path::new(MAIN_SEPARATOR_STR));
let mut parent_path_str = parent_path
.to_str()
.unwrap() // SAFETY: This unwrap is ok because this path was a valid UTF-8 String before
.to_string();
if !parent_path_str.ends_with('/') {
parent_path_str += "/";
if !parent_path_str.ends_with(MAIN_SEPARATOR) {
parent_path_str += MAIN_SEPARATOR_STR;
}
Self {
materialized_path: parent_path_str,
materialized_path: Cow::Owned(parent_path_str),
is_dir: true,
location_id: self.location_id,
// NOTE: This way we don't use the same name for "/" `file_path`, that uses the location
// name in the database, check later if this is a problem
name: Self::prepare_name(parent_path),
extension: String::new(),
name: Cow::Owned(Self::prepare_name(parent_path).to_string()),
extension: Cow::Owned(String::new()),
}
}
}
impl From<MaterializedPath> for String {
impl<'a, S: AsRef<str> + 'a> From<(LocationId, &'a S)> for MaterializedPath<'a> {
fn from((location_id, materialized_path): (LocationId, &'a S)) -> Self {
let materialized_path = materialized_path.as_ref();
let is_dir = materialized_path.ends_with(MAIN_SEPARATOR);
let length = materialized_path.len();
let (name, extension) = if length == 1 {
// The case for the root path
(materialized_path, "")
} else if is_dir {
let first_name_char = materialized_path[..(length - 1)]
.rfind(MAIN_SEPARATOR)
.unwrap_or(0) + 1;
(&materialized_path[first_name_char..(length - 1)], "")
} else {
let first_name_char = materialized_path.rfind(MAIN_SEPARATOR).unwrap_or(0) + 1;
if let Some(last_dot_relative_idx) = materialized_path[first_name_char..].rfind('.') {
let last_dot_idx = first_name_char + last_dot_relative_idx;
(
&materialized_path[first_name_char..last_dot_idx],
&materialized_path[last_dot_idx + 1..],
)
} else {
(&materialized_path[first_name_char..], "")
}
};
Self {
materialized_path: Cow::Borrowed(materialized_path),
location_id,
is_dir,
name: Cow::Borrowed(name),
extension: Cow::Borrowed(extension),
}
}
}
impl From<MaterializedPath<'_>> for String {
fn from(path: MaterializedPath) -> Self {
path.materialized_path
path.materialized_path.into_owned()
}
}
impl From<&MaterializedPath> for String {
impl From<&MaterializedPath<'_>> for String {
fn from(path: &MaterializedPath) -> Self {
path.materialized_path.clone()
path.materialized_path.to_string()
}
}
impl AsRef<str> for MaterializedPath {
impl AsRef<str> for MaterializedPath<'_> {
fn as_ref(&self) -> &str {
self.materialized_path.as_ref()
}
}
impl AsRef<Path> for MaterializedPath {
impl AsRef<Path> for &MaterializedPath<'_> {
fn as_ref(&self) -> &Path {
Path::new(&self.materialized_path)
// Skipping / because it's not a valid path to be joined
Path::new(&self.materialized_path[1..])
}
}
impl Display for MaterializedPath {
impl Display for MaterializedPath<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.materialized_path)
}
@ -175,6 +214,8 @@ impl Display for MaterializedPath {
#[derive(Error, Debug)]
pub enum FilePathError {
#[error("File Path not found: <path={0}>")]
NotFound(PathBuf),
#[error("Received an invalid sub path: <location_path={location_path}, sub_path={sub_path}>")]
InvalidSubPath {
location_path: PathBuf,
@ -213,27 +254,40 @@ impl LastFilePathIdManager {
Default::default()
}
pub async fn get_max_file_path_id(
pub async fn sync(
&self,
location_id: LocationId,
db: &PrismaClient,
) -> Result<(), FilePathError> {
if let Some(mut id_ref) = self.last_id_by_location.get_mut(&location_id) {
*id_ref = Self::fetch_max_file_path_id(location_id, db).await?;
}
Ok(())
}
pub async fn increment(
&self,
location_id: LocationId,
by: i32,
db: &PrismaClient,
) -> Result<i32, FilePathError> {
Ok(match self.last_id_by_location.entry(location_id) {
Entry::Occupied(entry) => *entry.get(),
Entry::Occupied(mut entry) => {
let first_free_id = *entry.get() + 1;
*entry.get_mut() += by + 1;
first_free_id
}
Entry::Vacant(entry) => {
// I wish I could use `or_try_insert_with` method instead of this crappy match,
// but we don't have async closures yet ):
let id = Self::fetch_max_file_path_id(location_id, db).await?;
entry.insert(id);
id
let first_free_id = Self::fetch_max_file_path_id(location_id, db).await? + 1;
entry.insert(first_free_id + by);
first_free_id
}
})
}
pub async fn set_max_file_path_id(&self, location_id: LocationId, id: i32) {
self.last_id_by_location.insert(location_id, id);
}
async fn fetch_max_file_path_id(
location_id: LocationId,
db: &PrismaClient,
@ -259,8 +313,10 @@ impl LastFilePathIdManager {
location_id,
name,
extension,
}: MaterializedPath,
}: MaterializedPath<'_>,
parent_id: Option<i32>,
inode: u64,
device: u64,
) -> Result<file_path::Data, FilePathError> {
// Keeping a reference in that map for the entire duration of the function, so we keep it locked
let mut last_id_ref = match self.last_id_by_location.entry(location_id) {
@ -278,9 +334,11 @@ impl LastFilePathIdManager {
.create(
next_id,
location::id::equals(location_id),
materialized_path,
name,
extension,
materialized_path.into_owned(),
name.into_owned(),
extension.into_owned(),
inode.to_le_bytes().into(),
device.to_le_bytes().into(),
vec![
file_path::parent_id::set(parent_id),
file_path::is_dir::set(is_dir),
@ -324,11 +382,10 @@ pub fn extract_materialized_path(
})
}
pub async fn find_many_file_paths_by_full_path<'db>(
pub async fn filter_file_paths_by_many_full_path_params(
location: &location::Data,
full_paths: &[impl AsRef<Path>],
db: &'db PrismaClient,
) -> Result<FindMany<'db>, FilePathError> {
) -> Result<Vec<file_path::WhereParam>, FilePathError> {
let is_dirs = try_join_all(
full_paths
.iter()
@ -345,101 +402,130 @@ pub async fn find_many_file_paths_by_full_path<'db>(
// Collecting in a Result, so we stop on the first error
.collect::<Result<Vec<_>, _>>()?;
Ok(db.file_path().find_many(vec![
Ok(vec![
file_path::location_id::equals(location.id),
file_path::materialized_path::in_vec(materialized_paths),
]))
])
}
#[cfg(feature = "location-watcher")]
pub async fn check_existing_file_path(
materialized_path: &MaterializedPath<'_>,
db: &PrismaClient,
) -> Result<bool, FilePathError> {
db.file_path()
.count(filter_existing_file_path_params(materialized_path))
.exec()
.await
.map_or_else(|e| Err(e.into()), |count| Ok(count > 0))
}
pub fn filter_existing_file_path_params(
MaterializedPath {
materialized_path,
is_dir,
location_id,
name,
extension,
}: &MaterializedPath,
) -> Vec<file_path::WhereParam> {
let mut params = vec![
file_path::location_id::equals(*location_id),
file_path::materialized_path::equals(materialized_path.to_string()),
file_path::is_dir::equals(*is_dir),
file_path::extension::equals(extension.to_string()),
];
// This is due to a limitation of MaterializedPath, where we don't know the location name to use
// as the file_path name at the root of the location "/" or "\" on Windows
if materialized_path != MAIN_SEPARATOR_STR {
params.push(file_path::name::equals(name.to_string()));
}
params
}
/// With this function we try to do a loose filtering of file paths, to avoid having to do check
/// twice for directories and for files. This is because directories have a trailing `/` or `\` in
/// the materialized path
pub fn loose_find_existing_file_path_params(
MaterializedPath {
materialized_path,
is_dir,
location_id,
name,
..
}: &MaterializedPath,
) -> Vec<file_path::WhereParam> {
let mut materialized_path_str = materialized_path.to_string();
if *is_dir {
materialized_path_str.pop();
}
let mut params = vec![
file_path::location_id::equals(*location_id),
file_path::materialized_path::starts_with(materialized_path_str),
];
// This is due to a limitation of MaterializedPath, where we don't know the location name to use
// as the file_path name at the root of the location "/" or "\" on Windows
if materialized_path != MAIN_SEPARATOR_STR {
params.push(file_path::name::equals(name.to_string()));
}
params
}
pub async fn get_existing_file_path_id(
materialized_path: MaterializedPath,
materialized_path: &MaterializedPath<'_>,
db: &PrismaClient,
) -> Result<Option<i32>, FilePathError> {
db.file_path()
.find_first(vec![
file_path::location_id::equals(materialized_path.location_id),
file_path::materialized_path::equals(materialized_path.into()),
])
.find_first(filter_existing_file_path_params(materialized_path))
.select(file_path::select!({ id }))
.exec()
.await
.map_or_else(|e| Err(e.into()), |r| Ok(r.map(|r| r.id)))
}
#[cfg(feature = "location-watcher")]
pub async fn get_existing_file_path(
materialized_path: MaterializedPath,
db: &PrismaClient,
) -> Result<Option<file_path::Data>, FilePathError> {
db.file_path()
.find_first(vec![
file_path::location_id::equals(materialized_path.location_id),
file_path::materialized_path::equals(materialized_path.into()),
])
.exec()
.await
.map_err(Into::into)
}
#[cfg(feature = "location-watcher")]
pub async fn get_existing_file_path_with_object(
materialized_path: MaterializedPath,
db: &PrismaClient,
) -> Result<Option<file_path_with_object::Data>, FilePathError> {
db.file_path()
.find_first(vec![
file_path::location_id::equals(materialized_path.location_id),
file_path::materialized_path::equals(materialized_path.into()),
])
// include object for orphan check
.include(file_path_with_object::include())
.exec()
.await
.map_err(Into::into)
}
#[cfg(feature = "location-watcher")]
pub async fn get_existing_file_or_directory(
location: &super::location_with_indexer_rules::Data,
path: impl AsRef<Path>,
db: &PrismaClient,
) -> Result<Option<file_path_with_object::Data>, FilePathError> {
let mut maybe_file_path = get_existing_file_path_with_object(
MaterializedPath::new(location.id, &location.path, path.as_ref(), false)?,
db,
)
.await?;
// First we just check if this path was a file in our db, if it isn't then we check for a directory
if maybe_file_path.is_none() {
maybe_file_path = get_existing_file_path_with_object(
MaterializedPath::new(location.id, &location.path, path.as_ref(), true)?,
db,
)
.await?;
}
Ok(maybe_file_path)
}
#[cfg(feature = "location-watcher")]
pub async fn get_parent_dir(
materialized_path: &MaterializedPath,
materialized_path: &MaterializedPath<'_>,
db: &PrismaClient,
) -> Result<Option<file_path::Data>, FilePathError> {
get_existing_file_path(materialized_path.parent(), db).await
db.file_path()
.find_first(filter_existing_file_path_params(
&materialized_path.parent(),
))
.exec()
.await
.map_err(Into::into)
}
#[cfg(feature = "location-watcher")]
pub async fn get_parent_dir_id(
materialized_path: &MaterializedPath<'_>,
db: &PrismaClient,
) -> Result<Option<i32>, FilePathError> {
get_existing_file_path_id(&materialized_path.parent(), db).await
}
pub async fn ensure_sub_path_is_in_location(
location_path: impl AsRef<Path>,
sub_path: impl AsRef<Path>,
) -> Result<PathBuf, FilePathError> {
let sub_path = sub_path.as_ref();
let mut sub_path = sub_path.as_ref();
if sub_path.starts_with(MAIN_SEPARATOR_STR) {
// SAFETY: we just checked that it starts with the separator
sub_path = sub_path.strip_prefix(MAIN_SEPARATOR_STR).unwrap();
}
let location_path = location_path.as_ref();
if !sub_path.starts_with(location_path) {
// If the sub_path doesn't start with the location_path, we have to check if it's a
// materialized path received from the frontend, then we check if the full path exists
let full_path = location_path.join(sub_path);
match fs::metadata(&full_path).await {
Ok(_) => Ok(full_path),
Err(e) if e.kind() == io::ErrorKind::NotFound => Err(FilePathError::InvalidSubPath {
@ -457,8 +543,7 @@ pub async fn ensure_sub_path_is_directory(
location_path: impl AsRef<Path>,
sub_path: impl AsRef<Path>,
) -> Result<(), FilePathError> {
let sub_path = sub_path.as_ref();
let location_path = location_path.as_ref();
let mut sub_path = sub_path.as_ref();
match fs::metadata(sub_path).await {
Ok(meta) => {
@ -469,6 +554,13 @@ pub async fn ensure_sub_path_is_directory(
}
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
if sub_path.starts_with(MAIN_SEPARATOR_STR) {
// SAFETY: we just checked that it starts with the separator
sub_path = sub_path.strip_prefix(MAIN_SEPARATOR_STR).unwrap();
}
let location_path = location_path.as_ref();
match fs::metadata(location_path.join(sub_path)).await {
Ok(meta) => {
if meta.is_file() {
@ -489,3 +581,83 @@ pub async fn ensure_sub_path_is_directory(
Err(e) => Err(e.into()),
}
}
pub async fn retain_file_paths_in_location(
location_id: LocationId,
to_retain: Vec<i32>,
maybe_parent_file_path: Option<file_path_just_id_materialized_path::Data>,
db: &PrismaClient,
) -> Result<i64, FilePathError> {
let mut to_delete_params = vec![
file_path::location_id::equals(location_id),
file_path::id::not_in_vec(to_retain),
];
if let Some(parent_file_path) = maybe_parent_file_path {
// If the parent_materialized_path is not the root path, we only delete file paths that start with the parent path
if parent_file_path.materialized_path != MAIN_SEPARATOR_STR {
to_delete_params.push(file_path::materialized_path::starts_with(
parent_file_path.materialized_path,
));
} else {
// If the parent_materialized_path is the root path, we fetch children using the parent id
to_delete_params.push(file_path::parent_id::equals(Some(parent_file_path.id)));
}
}
db.file_path()
.delete_many(to_delete_params)
.exec()
.await
.map_err(Into::into)
}
#[allow(unused)] // TODO remove this annotation when we can use it on windows
pub fn get_inode_and_device(metadata: &Metadata) -> Result<(u64, u64), FilePathError> {
#[cfg(target_family = "unix")]
{
use std::os::unix::fs::MetadataExt;
Ok((metadata.ino(), metadata.dev()))
}
#[cfg(target_family = "windows")]
{
// TODO use this when it's stable and remove winapi-utils dependency
// use std::os::windows::fs::MetadataExt;
// Ok((
// metadata
// .file_index()
// .expect("This function must not be called from a `DirEntry`'s `Metadata"),
// metadata
// .volume_serial_number()
// .expect("This function must not be called from a `DirEntry`'s `Metadata") as u64,
// ))
todo!("Use metadata: {:#?}", metadata)
}
}
#[allow(unused)]
pub async fn get_inode_and_device_from_path(
path: impl AsRef<Path>,
) -> Result<(u64, u64), FilePathError> {
#[cfg(target_family = "unix")]
{
// TODO use this when it's stable and remove winapi-utils dependency
let metadata = fs::metadata(path.as_ref()).await?;
get_inode_and_device(&metadata)
}
#[cfg(target_family = "windows")]
{
use winapi_util::{file::information, Handle};
let info = information(&Handle::from_path_any(path.as_ref())?)?;
Ok((info.file_index(), info.volume_serial_number()))
}
}

View file

@ -3,8 +3,9 @@ use crate::{
library::Library,
location::file_path_helper::{
ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
file_path_just_id_materialized_path, find_many_file_paths_by_full_path,
get_existing_file_path_id, MaterializedPath,
file_path_just_id_materialized_path, filter_existing_file_path_params,
filter_file_paths_by_many_full_path_params, retain_file_paths_in_location,
MaterializedPath,
},
prisma::location,
};
@ -54,13 +55,6 @@ impl StatefulJob for IndexerJob {
let location_id = state.init.location.id;
let location_path = Path::new(&state.init.location.path);
// grab the next id so we can increment in memory for batch inserting
let first_file_id = last_file_path_id_manager
.get_max_file_path_id(location_id, db)
.await
.map_err(IndexerError::from)?
+ 1;
let mut indexer_rules_by_kind: HashMap<RuleKind, Vec<IndexerRule>> =
HashMap::with_capacity(state.init.location.indexer_rules.len());
for location_rule in &state.init.location.indexer_rules {
@ -74,7 +68,8 @@ impl StatefulJob for IndexerJob {
let mut dirs_ids = HashMap::new();
let to_walk_path = if let Some(ref sub_path) = state.init.sub_path {
let (to_walk_path, maybe_parent_file_path) = if let Some(ref sub_path) = state.init.sub_path
{
let full_path = ensure_sub_path_is_in_location(location_path, sub_path)
.await
.map_err(IndexerError::from)?;
@ -82,21 +77,24 @@ impl StatefulJob for IndexerJob {
.await
.map_err(IndexerError::from)?;
let sub_path_file_path_id = get_existing_file_path_id(
MaterializedPath::new(location_id, location_path, &full_path, true)
.map_err(IndexerError::from)?,
db,
)
.await
.map_err(IndexerError::from)?
.expect("Sub path should already exist in the database");
let sub_path_file_path = db
.file_path()
.find_first(filter_existing_file_path_params(
&MaterializedPath::new(location_id, location_path, &full_path, true)
.map_err(IndexerError::from)?,
))
.select(file_path_just_id_materialized_path::select())
.exec()
.await
.map_err(IndexerError::from)?
.expect("Sub path should already exist in the database");
// If we're operating with a sub_path, then we have to put its id on `dirs_ids` map
dirs_ids.insert(full_path.clone(), sub_path_file_path_id);
dirs_ids.insert(full_path.clone(), sub_path_file_path.id);
full_path
(full_path, Some(sub_path_file_path))
} else {
location_path.to_path_buf()
(location_path.to_path_buf(), None)
};
let scan_start = Instant::now();
@ -118,29 +116,53 @@ impl StatefulJob for IndexerJob {
)
.await?;
// NOTE:
// As we're passing the list of currently existing file paths to the `find_many_file_paths_by_full_path` query,
// it means that `dirs_ids` contains just paths that still exists on the filesystem.
dirs_ids.extend(
find_many_file_paths_by_full_path(
&location::Data::from(&state.init.location),
&found_paths
.iter()
.map(|entry| &entry.path)
.collect::<Vec<_>>(),
db,
)
.await
.map_err(IndexerError::from)?
.select(file_path_just_id_materialized_path::select())
.exec()
.await?
.into_iter()
.map(|file_path| {
(
location_path.join(file_path.materialized_path),
file_path.id,
db.file_path()
.find_many(
filter_file_paths_by_many_full_path_params(
&location::Data::from(&state.init.location),
&found_paths
.iter()
.map(|entry| &entry.path)
.collect::<Vec<_>>(),
)
.await
.map_err(IndexerError::from)?,
)
}),
.select(file_path_just_id_materialized_path::select())
.exec()
.await?
.into_iter()
.map(|file_path| {
(
location_path.join(&MaterializedPath::from((
location_id,
&file_path.materialized_path,
))),
file_path.id,
)
}),
);
// Removing all other file paths that are not in the filesystem anymore
let removed_paths = retain_file_paths_in_location(
location_id,
dirs_ids.values().copied().collect(),
maybe_parent_file_path,
db,
)
.await
.map_err(IndexerError::from)?;
// Syncing the last file path id manager, as we potentially just removed a bunch of ids
last_file_path_id_manager
.sync(location_id, db)
.await
.map_err(IndexerError::from)?;
let mut new_paths = found_paths
.into_iter()
.filter_map(|entry| {
@ -169,6 +191,8 @@ impl StatefulJob for IndexerJob {
dirs_ids.get(parent_dir).copied()
}),
full_path: entry.path,
inode: entry.inode,
device: entry.device,
}
})
},
@ -177,16 +201,16 @@ impl StatefulJob for IndexerJob {
.collect::<Vec<_>>();
let total_paths = new_paths.len();
let last_file_id = first_file_id + total_paths as i32;
// Setting our global state for `file_path` ids
last_file_path_id_manager
.set_max_file_path_id(location_id, last_file_id)
.await;
// grab the next id so we can increment in memory for batch inserting
let first_file_id = last_file_path_id_manager
.increment(location_id, total_paths as i32, db)
.await
.map_err(IndexerError::from)?;
new_paths
.iter_mut()
.zip(first_file_id..last_file_id)
.zip(first_file_id..)
.for_each(|(entry, file_id)| {
// If the `parent_id` is still none here, is because the parent of this entry is also
// a new one in the DB
@ -205,6 +229,7 @@ impl StatefulJob for IndexerJob {
scan_read_time: scan_start.elapsed(),
total_paths,
indexed_paths: 0,
removed_paths,
});
state.steps = new_paths

View file

@ -59,6 +59,7 @@ pub struct IndexerJobData {
scan_read_time: Duration,
total_paths: usize,
indexed_paths: i64,
removed_paths: i64,
}
/// `IndexerJobStep` is a type alias, specifying that each step of the [`IndexerJob`] is a vector of
@ -70,10 +71,12 @@ pub type IndexerJobStep = Vec<IndexerJobStepEntry>;
#[derive(Serialize, Deserialize)]
pub struct IndexerJobStepEntry {
full_path: PathBuf,
materialized_path: MaterializedPath,
materialized_path: MaterializedPath<'static>,
created_at: DateTime<Utc>,
file_id: i32,
parent_id: Option<i32>,
inode: u64,
device: u64,
}
impl IndexerJobData {
@ -175,6 +178,8 @@ async fn execute_indexer_step(
("name", json!(name.clone())),
("is_dir", json!(is_dir)),
("extension", json!(extension.clone())),
("inode", json!(entry.inode)),
("device", json!(entry.device)),
("parent_id", json!(entry.parent_id)),
("date_created", json!(entry.created_at)),
],
@ -182,9 +187,11 @@ async fn execute_indexer_step(
file_path::create_unchecked(
entry.file_id,
location.id,
materialized_path,
name,
extension,
materialized_path.into_owned(),
name.into_owned(),
extension.into_owned(),
entry.inode.to_le_bytes().into(),
entry.device.to_le_bytes().into(),
vec![
is_dir::set(is_dir),
parent_id::set(entry.parent_id),
@ -224,7 +231,7 @@ where
.as_ref()
.expect("critical error: missing data on job state");
tracing::info!(
info!(
"scan of {} completed in {:?}. {} new files found, \
indexed {} files in db. db write completed in {:?}",
location_path.as_ref().display(),
@ -236,7 +243,7 @@ where
.expect("critical error: non-negative duration"),
);
if data.indexed_paths > 0 {
if data.indexed_paths > 0 || data.removed_paths > 0 {
invalidate_query!(ctx.library, "locations.getExplorerData");
}

View file

@ -3,8 +3,9 @@ use crate::{
library::Library,
location::file_path_helper::{
ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
file_path_just_id_materialized_path, find_many_file_paths_by_full_path,
get_existing_file_path_id, MaterializedPath,
file_path_just_id_materialized_path, filter_existing_file_path_params,
filter_file_paths_by_many_full_path_params, retain_file_paths_in_location,
MaterializedPath,
},
prisma::location,
};
@ -74,13 +75,6 @@ impl StatefulJob for ShallowIndexerJob {
let location_id = state.init.location.id;
let location_path = Path::new(&state.init.location.path);
// grab the next id so we can increment in memory for batch inserting
let first_file_id = last_file_path_id_manager
.get_max_file_path_id(location_id, db)
.await
.map_err(IndexerError::from)?
+ 1;
let mut indexer_rules_by_kind: HashMap<RuleKind, Vec<IndexerRule>> =
HashMap::with_capacity(state.init.location.indexer_rules.len());
for location_rule in &state.init.location.indexer_rules {
@ -92,7 +86,7 @@ impl StatefulJob for ShallowIndexerJob {
.push(indexer_rule);
}
let (to_walk_path, parent_id) = if state.init.sub_path != Path::new("") {
let (to_walk_path, parent_file_path) = if state.init.sub_path != Path::new("") {
let full_path = ensure_sub_path_is_in_location(location_path, &state.init.sub_path)
.await
.map_err(IndexerError::from)?;
@ -100,28 +94,33 @@ impl StatefulJob for ShallowIndexerJob {
.await
.map_err(IndexerError::from)?;
let materialized_path =
MaterializedPath::new(location_id, location_path, &full_path, true)
.map_err(IndexerError::from)?;
(
location_path.join(&state.init.sub_path),
get_existing_file_path_id(
MaterializedPath::new(location_id, location_path, &full_path, true)
.map_err(IndexerError::from)?,
db,
)
.await
.map_err(IndexerError::from)?
.expect("Sub path should already exist in the database"),
full_path,
db.file_path()
.find_first(filter_existing_file_path_params(&materialized_path))
.select(file_path_just_id_materialized_path::select())
.exec()
.await
.map_err(IndexerError::from)?
.expect("Sub path should already exist in the database"),
)
} else {
(
location_path.to_path_buf(),
get_existing_file_path_id(
MaterializedPath::new(location_id, location_path, location_path, true)
.map_err(IndexerError::from)?,
db,
)
.await
.map_err(IndexerError::from)?
.expect("Location root path should already exist in the database"),
db.file_path()
.find_first(filter_existing_file_path_params(
&MaterializedPath::new(location_id, location_path, location_path, true)
.map_err(IndexerError::from)?,
))
.select(file_path_just_id_materialized_path::select())
.exec()
.await
.map_err(IndexerError::from)?
.expect("Location root path should already exist in the database"),
)
};
@ -141,22 +140,42 @@ impl StatefulJob for ShallowIndexerJob {
)
.await?;
let already_existing_file_paths = find_many_file_paths_by_full_path(
&location::Data::from(&state.init.location),
&found_paths
.iter()
.map(|entry| &entry.path)
.collect::<Vec<_>>(),
db,
)
.await
.map_err(IndexerError::from)?
.select(file_path_just_id_materialized_path::select())
.exec()
.await?
.into_iter()
.map(|file_path| file_path.materialized_path)
.collect::<HashSet<_>>();
let (already_existing_file_paths, mut to_retain) = db
.file_path()
.find_many(
filter_file_paths_by_many_full_path_params(
&location::Data::from(&state.init.location),
&found_paths
.iter()
.map(|entry| &entry.path)
.collect::<Vec<_>>(),
)
.await
.map_err(IndexerError::from)?,
)
.select(file_path_just_id_materialized_path::select())
.exec()
.await?
.into_iter()
.map(|file_path| (file_path.materialized_path, file_path.id))
.unzip::<_, _, HashSet<_>, Vec<_>>();
let parent_id = parent_file_path.id;
// Adding our parent path id
to_retain.push(parent_id);
// Removing all other file paths that are not in the filesystem anymore
let removed_paths =
retain_file_paths_in_location(location_id, to_retain, Some(parent_file_path), db)
.await
.map_err(IndexerError::from)?;
// Syncing the last file path id manager, as we potentially just removed a bunch of ids
last_file_path_id_manager
.sync(location_id, db)
.await
.map_err(IndexerError::from)?;
// Filter out paths that are already in the databases
let mut new_paths = found_paths
@ -177,6 +196,8 @@ impl StatefulJob for ShallowIndexerJob {
created_at: entry.created_at,
file_id: 0, // To be set later
parent_id: Some(parent_id),
inode: entry.inode,
device: entry.device,
})
},
)
@ -186,16 +207,16 @@ impl StatefulJob for ShallowIndexerJob {
.collect::<Vec<_>>();
let total_paths = new_paths.len();
let last_file_id = first_file_id + total_paths as i32;
// Setting our global state for file_path ids
last_file_path_id_manager
.set_max_file_path_id(location_id, last_file_id)
.await;
// grab the next id so we can increment in memory for batch inserting
let first_file_id = last_file_path_id_manager
.increment(location_id, total_paths as i32, db)
.await
.map_err(IndexerError::from)?;
new_paths
.iter_mut()
.zip(first_file_id..last_file_id)
.zip(first_file_id..)
.for_each(|(entry, file_id)| {
entry.file_id = file_id;
});
@ -207,6 +228,7 @@ impl StatefulJob for ShallowIndexerJob {
scan_read_time: scan_start.elapsed(),
total_paths,
indexed_paths: 0,
removed_paths,
});
state.steps = new_paths

View file

@ -1,10 +1,17 @@
use chrono::{DateTime, Utc};
#[cfg(target_family = "unix")]
use crate::location::file_path_helper::get_inode_and_device;
#[cfg(target_family = "windows")]
use crate::location::file_path_helper::get_inode_and_device_from_path;
use std::{
cmp::Ordering,
collections::{HashMap, VecDeque},
hash::{Hash, Hasher},
path::{Path, PathBuf},
};
use chrono::{DateTime, Utc};
use tokio::fs;
use tracing::{error, trace};
@ -20,6 +27,8 @@ pub(super) struct WalkEntry {
pub(super) path: PathBuf,
pub(super) is_dir: bool,
pub(super) created_at: DateTime<Utc>,
pub(super) inode: u64,
pub(super) device: u64,
}
impl PartialEq for WalkEntry {
@ -137,7 +146,7 @@ async fn inner_walk_single_dir(
);
if let Some(reject_rules) = rules_per_kind.get(&RuleKind::RejectFilesByGlob) {
for reject_rule in reject_rules {
// It's ok to unwrap here, reject rules are infallible
// SAFETY: It's ok to unwrap here, reject rules of this kind are infallible
if !reject_rule.apply(&current_path).await.unwrap() {
trace!(
"Path {} rejected by rule {}",
@ -158,6 +167,27 @@ async fn inner_walk_single_dir(
let is_dir = metadata.is_dir();
let (inode, device) = match {
#[cfg(target_family = "unix")]
{
get_inode_and_device(&metadata)
}
#[cfg(target_family = "windows")]
{
get_inode_and_device_from_path(&current_path).await
}
} {
Ok(inode_and_device) => inode_and_device,
Err(e) => {
error!(
"Error getting inode and device for {}: {e}",
current_path.display(),
);
continue 'entries;
}
};
if is_dir {
// If it is a directory, first we check if we must reject it and its children entirely
if let Some(reject_by_children_rules) =
@ -259,6 +289,8 @@ async fn inner_walk_single_dir(
path: current_path.clone(),
is_dir,
created_at: metadata.created()?.into(),
inode,
device,
},
);
@ -270,12 +302,27 @@ async fn inner_walk_single_dir(
{
trace!("Indexing ancestor {}", ancestor.display());
if !indexed_paths.contains_key(ancestor) {
let metadata = fs::metadata(ancestor).await?;
let (inode, device) = {
#[cfg(target_family = "unix")]
{
get_inode_and_device(&metadata)?
}
#[cfg(target_family = "windows")]
{
get_inode_and_device_from_path(ancestor).await?
}
};
indexed_paths.insert(
ancestor.to_path_buf(),
WalkEntry {
path: ancestor.to_path_buf(),
is_dir: true,
created_at: fs::metadata(ancestor).await?.created()?.into(),
created_at: metadata.created()?.into(),
inode,
device,
},
);
} else {
@ -299,11 +346,24 @@ async fn prepared_indexed_paths(
if include_root {
// Also adding the root location path
let root_created_at = fs::metadata(&root).await?.created()?.into();
let metadata = fs::metadata(&root).await?;
let (inode, device) = {
#[cfg(target_family = "unix")]
{
get_inode_and_device(&metadata)?
}
#[cfg(target_family = "windows")]
{
get_inode_and_device_from_path(&root).await?
}
};
indexed_paths.push(WalkEntry {
path: root,
is_dir: true,
created_at: root_created_at,
created_at: metadata.created()?.into(),
inode,
device,
});
}
@ -410,33 +470,35 @@ mod tests {
let root = prepare_location().await;
let root_path = root.path();
let any_datetime = Utc::now();
let created_at = Utc::now();
let inode = 0;
let device = 0;
#[rustfmt::skip]
let expected = [
WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/.git"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/Cargo.toml"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/src"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/src/main.rs"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/target"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/target/debug"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/target/debug/main"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("inner"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/.git"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/package.json"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/src"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/src/App.tsx"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/node_modules"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/node_modules/react"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/node_modules/react/package.json"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("photos"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("photos/photo1.png"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("photos/photo2.jpg"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("photos/photo3.jpeg"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("photos/text.txt"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/.git"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/Cargo.toml"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/src"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/src/main.rs"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/target"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/target/debug"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/target/debug/main"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("inner"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/.git"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/package.json"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/src"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/src/App.tsx"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/node_modules"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/node_modules/react"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/node_modules/react/package.json"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("photos"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("photos/photo1.png"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("photos/photo2.jpg"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("photos/photo3.jpeg"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("photos/text.txt"), is_dir: false, created_at, inode, device },
]
.into_iter()
.collect::<BTreeSet<_>>();
@ -456,15 +518,17 @@ mod tests {
let root = prepare_location().await;
let root_path = root.path();
let any_datetime = Utc::now();
let created_at = Utc::now();
let inode = 0;
let device = 0;
#[rustfmt::skip]
let expected = [
WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("photos"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("photos/photo1.png"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("photos/photo2.jpg"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("photos/photo3.jpeg"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("photos"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("photos/photo1.png"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("photos/photo2.jpg"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("photos/photo3.jpeg"), is_dir: false, created_at, inode, device },
]
.into_iter()
.collect::<BTreeSet<_>>();
@ -495,28 +559,30 @@ mod tests {
let root = prepare_location().await;
let root_path = root.path();
let any_datetime = Utc::now();
let created_at = Utc::now();
let inode = 0;
let device = 0;
#[rustfmt::skip]
let expected = [
WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/.git"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/Cargo.toml"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/src"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/src/main.rs"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/target"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/target/debug"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/target/debug/main"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("inner"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/.git"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/package.json"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/src"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/src/App.tsx"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/node_modules"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/node_modules/react"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/node_modules/react/package.json"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/.git"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/Cargo.toml"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/src"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/src/main.rs"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/target"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/target/debug"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/target/debug/main"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("inner"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/.git"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/package.json"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/src"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/src/App.tsx"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/node_modules"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/node_modules/react"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/node_modules/react/package.json"), is_dir: false, created_at, inode, device },
]
.into_iter()
.collect::<BTreeSet<_>>();
@ -549,22 +615,24 @@ mod tests {
let root = prepare_location().await;
let root_path = root.path();
let any_datetime = Utc::now();
let created_at = Utc::now();
let inode = 0;
let device = 0;
#[rustfmt::skip]
let expected = [
WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/.git"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/Cargo.toml"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/src"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("rust_project/src/main.rs"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("inner"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/.git"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/package.json"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/src"), is_dir: true, created_at: any_datetime },
WalkEntry { path: root_path.join("inner/node_project/src/App.tsx"), is_dir: false, created_at: any_datetime },
WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/.git"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/Cargo.toml"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/src"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("rust_project/src/main.rs"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("inner"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/.git"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/package.json"), is_dir: false, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/src"), is_dir: true, created_at, inode, device },
WalkEntry { path: root_path.join("inner/node_project/src/App.tsx"), is_dir: false, created_at, inode, device },
]
.into_iter()
.collect::<BTreeSet<_>>();

View file

@ -17,28 +17,31 @@ type LocationAndLibraryKey = (LocationId, LibraryId);
const LOCATION_CHECK_INTERVAL: Duration = Duration::from_secs(5);
pub(super) async fn check_online(location: &location::Data, library: &Library) -> bool {
let pub_id = &location.pub_id;
pub(super) async fn check_online(
location: &location::Data,
library: &Library,
) -> Result<bool, LocationManagerError> {
let pub_id = Uuid::from_slice(&location.pub_id)?;
if location.node_id == library.node_local_id {
match fs::metadata(&location.path).await {
Ok(_) => {
library.location_manager().add_online(pub_id).await;
true
Ok(true)
}
Err(e) if e.kind() == ErrorKind::NotFound => {
library.location_manager().remove_online(pub_id).await;
false
library.location_manager().remove_online(&pub_id).await;
Ok(false)
}
Err(e) => {
error!("Failed to check if location is online: {:#?}", e);
false
Ok(false)
}
}
} else {
// In this case, we don't have a `local_path`, but this location was marked as online
library.location_manager().remove_online(pub_id).await;
false
library.location_manager().remove_online(&pub_id).await;
Err(LocationManagerError::NonLocalLocation(location.id))
}
}

View file

@ -19,6 +19,7 @@ use tracing::{debug, error};
#[cfg(feature = "location-watcher")]
use tokio::sync::mpsc;
use uuid::Uuid;
use super::{file_path_helper::FilePathError, LocationId};
@ -85,8 +86,12 @@ pub enum LocationManagerError {
#[error("Failed to stop or reinit a watcher: {reason}")]
FailedToStopOrReinitWatcher { reason: String },
#[error("Location missing local path: <id='{0}'>")]
LocationMissingLocalPath(LocationId),
#[error("Missing location from database: <id='{0}'>")]
MissingLocation(LocationId),
#[error("Non local location: <id='{0}'>")]
NonLocalLocation(LocationId),
#[error("Tried to update a non-existing file: <path='{0}'>")]
UpdateNonExistingFile(PathBuf),
#[error("Database error: {0}")]
@ -95,9 +100,11 @@ pub enum LocationManagerError {
IOError(#[from] io::Error),
#[error("File path related error (error: {0})")]
FilePathError(#[from] FilePathError),
#[error("Corrupted location pub_id on database: (error: {0})")]
CorruptedLocationPubId(#[from] uuid::Error),
}
type OnlineLocations = BTreeSet<Vec<u8>>;
type OnlineLocations = BTreeSet<Uuid>;
#[derive(Debug)]
pub struct LocationManager {
@ -324,7 +331,13 @@ impl LocationManager {
// To add a new location
ManagementMessageAction::Add => {
if let Some(location) = get_location(location_id, &library).await {
let is_online = check_online(&location, &library).await;
let is_online = match check_online(&location, &library).await {
Ok(is_online) => is_online,
Err(e) => {
error!("Error while checking online status of location {location_id}: {e}");
continue;
}
};
let _ = response_tx.send(
LocationWatcher::new(location, library.clone())
.await
@ -426,7 +439,15 @@ impl LocationManager {
to_remove.remove(&key);
} else if let Some(location) = get_location(location_id, &library).await {
if location.node_id == library.node_local_id {
if check_online(&location, &library).await
let is_online = match check_online(&location, &library).await {
Ok(is_online) => is_online,
Err(e) => {
error!("Error while checking online status of location {location_id}: {e}");
continue;
}
};
if is_online
&& !forced_unwatch.contains(&key)
{
watch_location(
@ -449,7 +470,7 @@ impl LocationManager {
location_id,
library.id,
"Dropping location from location manager, because \
we don't have a `local_path` anymore",
it isn't a location in the current node",
&mut locations_watched,
&mut locations_unwatched
);
@ -477,7 +498,7 @@ impl LocationManager {
Ok(())
}
pub async fn is_online(&self, id: &Vec<u8>) -> bool {
pub async fn is_online(&self, id: &Uuid) -> bool {
let online_locations = self.online_locations.read().await;
online_locations.contains(id)
}
@ -490,12 +511,12 @@ impl LocationManager {
self.online_tx.send(self.get_online().await).ok();
}
pub async fn add_online(&self, id: &[u8]) {
self.online_locations.write().await.insert(id.into());
pub async fn add_online(&self, id: Uuid) {
self.online_locations.write().await.insert(id);
self.broadcast_online().await;
}
pub async fn remove_online(&self, id: &Vec<u8>) {
pub async fn remove_online(&self, id: &Uuid) {
let mut online_locations = self.online_locations.write().await;
online_locations.remove(id);
self.broadcast_online().await;

View file

@ -1,6 +1,16 @@
use crate::{
library::Library,
location::{location_with_indexer_rules, manager::LocationManagerError},
//! Linux has the best behaving file system events, with just some small caveats:
//! When we move files or directories, we receive 3 events: Rename From, Rename To and Rename Both.
//! But when we move a file or directory to the outside from the watched location, we just receive
//! the Rename From event, so we have to keep track of all rename events to match them against each
//! other. If we have dangling Rename From events, we have to remove them after some time.
//! Aside from that, when a directory is moved to our watched location from the outside, we receive
//! a Create Dir event, this one is actually ok at least.
use crate::{invalidate_query, library::Library, location::manager::LocationManagerError};
use std::{
collections::{BTreeMap, HashMap},
path::PathBuf,
};
use async_trait::async_trait;
@ -8,43 +18,77 @@ use notify::{
event::{AccessKind, AccessMode, CreateKind, ModifyKind, RenameMode},
Event, EventKind,
};
use tracing::trace;
use tokio::{fs, time::Instant};
use tracing::{error, trace};
use super::{
utils::{create_dir, file_creation_or_update, remove_event, rename_both_event},
EventHandler,
utils::{create_dir, file_creation_or_update, remove, rename},
EventHandler, LocationId, HUNDRED_MILLIS,
};
#[derive(Debug)]
pub(super) struct LinuxEventHandler {}
pub(super) struct LinuxEventHandler<'lib> {
location_id: LocationId,
library: &'lib Library,
last_check_rename: Instant,
rename_from: HashMap<PathBuf, Instant>,
rename_from_buffer: Vec<(PathBuf, Instant)>,
recently_renamed_from: BTreeMap<PathBuf, Instant>,
}
#[async_trait]
impl EventHandler for LinuxEventHandler {
fn new() -> Self {
Self {}
impl<'lib> EventHandler<'lib> for LinuxEventHandler<'lib> {
fn new(location_id: LocationId, library: &'lib Library) -> Self {
Self {
location_id,
library,
last_check_rename: Instant::now(),
rename_from: HashMap::new(),
rename_from_buffer: Vec::new(),
recently_renamed_from: BTreeMap::new(),
}
}
async fn handle_event(
&mut self,
location: location_with_indexer_rules::Data,
library: &Library,
event: Event,
) -> Result<(), LocationManagerError> {
trace!("Received Linux event: {:#?}", event);
async fn handle_event(&mut self, event: Event) -> Result<(), LocationManagerError> {
tracing::debug!("Received Linux event: {:#?}", event);
match event.kind {
let Event {
kind, mut paths, ..
} = event;
match kind {
EventKind::Access(AccessKind::Close(AccessMode::Write)) => {
// If a file was closed with write mode, then it was updated or created
file_creation_or_update(&location, &event, library).await?;
file_creation_or_update(self.location_id, &paths[0], self.library).await?;
}
EventKind::Create(CreateKind::Folder) => {
create_dir(&location, &event, library).await?;
let path = &paths[0];
create_dir(
self.location_id,
path,
&fs::metadata(path).await?,
self.library,
)
.await?;
}
EventKind::Modify(ModifyKind::Name(RenameMode::From)) => {
// Just in case we can't garantee that we receive the Rename From event before the
// Rename Both event. Just a safeguard
if self.recently_renamed_from.remove(&paths[0]).is_none() {
self.rename_from.insert(paths.remove(0), Instant::now());
}
}
EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => {
rename_both_event(&location, &event, library).await?;
let from_path = &paths[0];
self.rename_from.remove(from_path);
rename(self.location_id, &paths[1], from_path, self.library).await?;
self.recently_renamed_from
.insert(paths.swap_remove(0), Instant::now());
}
EventKind::Remove(remove_kind) => {
remove_event(&location, &event, remove_kind, library).await?;
EventKind::Remove(_) => {
remove(self.location_id, &paths[0], self.library).await?;
}
other_event_kind => {
trace!("Other Linux event that we don't handle for now: {other_event_kind:#?}");
@ -53,4 +97,37 @@ impl EventHandler for LinuxEventHandler {
Ok(())
}
async fn tick(&mut self) {
if self.last_check_rename.elapsed() > HUNDRED_MILLIS {
self.last_check_rename = Instant::now();
self.handle_rename_from_eviction().await;
self.recently_renamed_from
.retain(|_, instant| instant.elapsed() < HUNDRED_MILLIS);
}
}
}
impl LinuxEventHandler<'_> {
async fn handle_rename_from_eviction(&mut self) {
self.rename_from_buffer.clear();
for (path, instant) in self.rename_from.drain() {
if instant.elapsed() > HUNDRED_MILLIS {
if let Err(e) = remove(self.location_id, &path, self.library).await {
error!("Failed to remove file_path: {e}");
} else {
trace!("Removed file_path due timeout: {}", path.display());
invalidate_query!(self.library, "locations.getExplorerData");
}
} else {
self.rename_from_buffer.push((path, instant));
}
}
for (path, instant) in self.rename_from_buffer.drain(..) {
self.rename_from.insert(path, instant);
}
}
}

View file

@ -1,6 +1,27 @@
//! On MacOS, we use the FSEvents backend of notify-rs and Rename events are pretty complicated;
//! There are just (ModifyKind::Name(RenameMode::Any) events and nothing else.
//! This means that we have to link the old path with the new path to know which file was renamed.
//! But you can't forget that renames events aren't always the case that I file name was modified,
//! but its path was modified. So we have to check if the file was moved. When a file is moved
//! inside the same location, we received 2 events: one for the old path and one for the new path.
//! But when a file is moved to another location, we only receive the old path event... This
//! way we have to handle like a file deletion, and the same applies for when a file is moved to our
//! current location from anywhere else, we just receive the new path rename event, which means a
//! creation.
use crate::{
invalidate_query,
library::Library,
location::{location_with_indexer_rules, manager::LocationManagerError},
location::{
file_path_helper::{check_existing_file_path, get_inode_and_device, MaterializedPath},
manager::LocationManagerError,
LocationId,
},
};
use std::{
collections::{BTreeMap, HashMap},
path::PathBuf,
};
use async_trait::async_trait;
@ -8,40 +29,60 @@ use notify::{
event::{CreateKind, DataChange, ModifyKind, RenameMode},
Event, EventKind,
};
use tracing::trace;
use tokio::{fs, io, time::Instant};
use tracing::{error, trace, warn};
use super::{
utils::{create_dir, file_creation_or_update, remove_event, rename},
EventHandler,
utils::{
create_dir, create_dir_or_file, create_file, extract_inode_and_device_from_path,
extract_location_path, remove, rename, update_file,
},
EventHandler, INodeAndDevice, InstantAndPath, HUNDRED_MILLIS, ONE_SECOND,
};
#[derive(Debug, Default)]
pub(super) struct MacOsEventHandler {
latest_created_dir: Option<Event>,
rename_stack: Option<Event>,
#[derive(Debug)]
pub(super) struct MacOsEventHandler<'lib> {
location_id: LocationId,
library: &'lib Library,
recently_created_files: BTreeMap<PathBuf, Instant>,
last_check_created_files: Instant,
latest_created_dir: Option<PathBuf>,
last_check_rename: Instant,
old_paths_map: HashMap<INodeAndDevice, InstantAndPath>,
new_paths_map: HashMap<INodeAndDevice, InstantAndPath>,
paths_map_buffer: Vec<(INodeAndDevice, InstantAndPath)>,
}
#[async_trait]
impl EventHandler for MacOsEventHandler {
fn new() -> Self
impl<'lib> EventHandler<'lib> for MacOsEventHandler<'lib> {
fn new(location_id: LocationId, library: &'lib Library) -> Self
where
Self: Sized,
{
Default::default()
Self {
location_id,
library,
recently_created_files: BTreeMap::new(),
last_check_created_files: Instant::now(),
latest_created_dir: None,
last_check_rename: Instant::now(),
old_paths_map: HashMap::new(),
new_paths_map: HashMap::new(),
paths_map_buffer: Vec::new(),
}
}
async fn handle_event(
&mut self,
location: location_with_indexer_rules::Data,
library: &Library,
event: Event,
) -> Result<(), LocationManagerError> {
async fn handle_event(&mut self, event: Event) -> Result<(), LocationManagerError> {
trace!("Received MacOS event: {:#?}", event);
match event.kind {
let Event {
kind, mut paths, ..
} = event;
match kind {
EventKind::Create(CreateKind::Folder) => {
if let Some(latest_created_dir) = self.latest_created_dir.take() {
if event.paths[0] == latest_created_dir.paths[0] {
if paths[0] == latest_created_dir {
// NOTE: This is a MacOS specific event that happens when a folder is created
// trough Finder. It creates a folder but 2 events are triggered in
// FSEvents. So we store and check the latest created folder to avoid
@ -50,26 +91,40 @@ impl EventHandler for MacOsEventHandler {
}
}
create_dir(&location, &event, library).await?;
self.latest_created_dir = Some(event);
create_dir(
self.location_id,
&paths[0],
&fs::metadata(&paths[0]).await?,
self.library,
)
.await?;
self.latest_created_dir = Some(paths.remove(0));
}
EventKind::Create(CreateKind::File) => {
create_file(
self.location_id,
&paths[0],
&fs::metadata(&paths[0]).await?,
self.library,
)
.await?;
self.recently_created_files
.insert(paths.remove(0), Instant::now());
}
EventKind::Modify(ModifyKind::Data(DataChange::Content)) => {
// If a file had its content modified, then it was updated or created
file_creation_or_update(&location, &event, library).await?;
}
EventKind::Modify(ModifyKind::Name(RenameMode::Any)) => {
match self.rename_stack.take() {
None => {
self.rename_stack = Some(event);
}
Some(from_event) => {
rename(&event.paths[0], &from_event.paths[0], &location, library).await?;
}
// NOTE: MacOS emits a Create File and then an Update Content event
// when a file is created. So we need to check if the file was recently
// created to avoid unecessary updates
if !self.recently_created_files.contains_key(&paths[0]) {
update_file(self.location_id, &paths[0], self.library).await?;
}
}
EventKind::Modify(ModifyKind::Name(RenameMode::Any)) => {
self.handle_single_rename_event(paths.remove(0)).await?;
}
EventKind::Remove(remove_kind) => {
remove_event(&location, &event, remove_kind, library).await?;
EventKind::Remove(_) => {
remove(self.location_id, &paths[0], self.library).await?;
}
other_event_kind => {
trace!("Other MacOS event that we don't handle for now: {other_event_kind:#?}");
@ -78,4 +133,138 @@ impl EventHandler for MacOsEventHandler {
Ok(())
}
async fn tick(&mut self) {
// Cleaning out recently created files that are older than 1 second
if self.last_check_created_files.elapsed() > ONE_SECOND {
self.last_check_created_files = Instant::now();
self.recently_created_files
.retain(|_, created_at| created_at.elapsed() < ONE_SECOND);
}
if self.last_check_rename.elapsed() > HUNDRED_MILLIS {
// Cleaning out recently renamed files that are older than 100 milliseconds
self.handle_create_eviction().await;
self.handle_remove_eviction().await;
}
}
}
impl MacOsEventHandler<'_> {
async fn handle_create_eviction(&mut self) {
// Just to make sure that our buffer is clean
self.paths_map_buffer.clear();
for (inode_and_device, (instant, path)) in self.new_paths_map.drain() {
if instant.elapsed() > HUNDRED_MILLIS {
if let Err(e) = create_dir_or_file(self.location_id, &path, self.library).await {
error!("Failed to create file_path on MacOS : {e}");
} else {
trace!("Created file_path due timeout: {}", path.display());
invalidate_query!(self.library, "locations.getExplorerData");
}
} else {
self.paths_map_buffer
.push((inode_and_device, (instant, path)));
}
}
for (key, value) in self.paths_map_buffer.drain(..) {
self.new_paths_map.insert(key, value);
}
}
async fn handle_remove_eviction(&mut self) {
// Just to make sure that our buffer is clean
self.paths_map_buffer.clear();
for (inode_and_device, (instant, path)) in self.old_paths_map.drain() {
if instant.elapsed() > HUNDRED_MILLIS {
if let Err(e) = remove(self.location_id, &path, self.library).await {
error!("Failed to remove file_path: {e}");
} else {
trace!("Removed file_path due timeout: {}", path.display());
invalidate_query!(self.library, "locations.getExplorerData");
}
} else {
self.paths_map_buffer
.push((inode_and_device, (instant, path)));
}
}
for (key, value) in self.paths_map_buffer.drain(..) {
self.old_paths_map.insert(key, value);
}
}
async fn handle_single_rename_event(
&mut self,
path: PathBuf, // this is used internally only once, so we can use just PathBuf
) -> Result<(), LocationManagerError> {
match fs::metadata(&path).await {
Ok(meta) => {
// File or directory exists, so this can be a "new path" to an actual rename/move or a creation
trace!("Path exists: {}", path.display());
let inode_and_device = get_inode_and_device(&meta)?;
let location_path = extract_location_path(self.location_id, self.library).await?;
if !check_existing_file_path(
&MaterializedPath::new(self.location_id, &location_path, &path, meta.is_dir())?,
&self.library.db,
)
.await?
{
if let Some((_, old_path)) = self.old_paths_map.remove(&inode_and_device) {
trace!(
"Got a match new -> old: {} -> {}",
path.display(),
old_path.display()
);
// We found a new path for this old path, so we can rename it
rename(self.location_id, &path, &old_path, self.library).await?;
} else {
trace!("No match for new path yet: {}", path.display());
self.new_paths_map
.insert(inode_and_device, (Instant::now(), path));
}
} else {
warn!(
"Received rename event for a file that already exists in the database: {}",
path.display()
);
}
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
// File or directory does not exist in the filesystem, if it exists in the database,
// then we try pairing it with the old path from our map
trace!("Path doesn't exists: {}", path.display());
let inode_and_device =
extract_inode_and_device_from_path(self.location_id, &path, self.library)
.await?;
if let Some((_, new_path)) = self.new_paths_map.remove(&inode_and_device) {
trace!(
"Got a match old -> new: {} -> {}",
path.display(),
new_path.display()
);
// We found a new path for this old path, so we can rename it
rename(self.location_id, &new_path, &path, self.library).await?;
} else {
trace!("No match for old path yet: {}", path.display());
// We didn't find a new path for this old path, so we store ir for later
self.old_paths_map
.insert(inode_and_device, (Instant::now(), path));
}
}
Err(e) => return Err(e.into()),
}
Ok(())
}
}

View file

@ -1,12 +1,9 @@
use crate::{
library::Library,
location::{find_location, location_with_indexer_rules, LocationId},
prisma::location,
};
use crate::{library::Library, location::LocationId, prisma::location};
use std::{
collections::HashSet,
path::{Path, PathBuf},
time::Duration,
};
use async_trait::async_trait;
@ -16,8 +13,10 @@ use tokio::{
select,
sync::{mpsc, oneshot},
task::{block_in_place, JoinHandle},
time::{interval_at, Instant, MissedTickBehavior},
};
use tracing::{debug, error, warn};
use uuid::Uuid;
use super::LocationManagerError;
@ -30,28 +29,34 @@ mod utils;
use utils::check_event;
#[cfg(target_os = "linux")]
type Handler = linux::LinuxEventHandler;
type Handler<'lib> = linux::LinuxEventHandler<'lib>;
#[cfg(target_os = "macos")]
type Handler = macos::MacOsEventHandler;
type Handler<'lib> = macos::MacOsEventHandler<'lib>;
#[cfg(target_os = "windows")]
type Handler = windows::WindowsEventHandler;
type Handler<'lib> = windows::WindowsEventHandler<'lib>;
pub(super) type IgnorePath = (PathBuf, bool);
type INodeAndDevice = (u64, u64);
type InstantAndPath = (Instant, PathBuf);
const ONE_SECOND: Duration = Duration::from_secs(1);
const HUNDRED_MILLIS: Duration = Duration::from_millis(100);
#[async_trait]
trait EventHandler {
fn new() -> Self
trait EventHandler<'lib> {
fn new(location_id: LocationId, library: &'lib Library) -> Self
where
Self: Sized;
async fn handle_event(
&mut self,
location: location_with_indexer_rules::Data,
library: &Library,
event: Event,
) -> Result<(), LocationManagerError>;
/// Handle a file system event.
async fn handle_event(&mut self, event: Event) -> Result<(), LocationManagerError>;
/// As Event Handlers have some inner state, from time to time we need to call this tick method
/// so the event handler can update its state.
async fn tick(&mut self);
}
#[derive(Debug)]
@ -93,6 +98,7 @@ impl LocationWatcher {
let handle = tokio::spawn(Self::handle_watch_events(
location.id,
Uuid::from_slice(&location.pub_id)?,
library,
events_rx,
ignore_path_rx,
@ -110,15 +116,20 @@ impl LocationWatcher {
async fn handle_watch_events(
location_id: LocationId,
location_pub_id: Uuid,
library: Library,
mut events_rx: mpsc::UnboundedReceiver<notify::Result<Event>>,
mut ignore_path_rx: mpsc::UnboundedReceiver<IgnorePath>,
mut stop_rx: oneshot::Receiver<()>,
) {
let mut event_handler = Handler::new();
let mut event_handler = Handler::new(location_id, &library);
let mut paths_to_ignore = HashSet::new();
let mut handler_interval = interval_at(Instant::now() + HUNDRED_MILLIS, HUNDRED_MILLIS);
// In case of doubt check: https://docs.rs/tokio/latest/tokio/time/enum.MissedTickBehavior.html
handler_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
select! {
Some(event) = events_rx.recv() => {
@ -126,6 +137,7 @@ impl LocationWatcher {
Ok(event) => {
if let Err(e) = Self::handle_single_event(
location_id,
location_pub_id,
event,
&mut event_handler,
&library,
@ -150,6 +162,10 @@ impl LocationWatcher {
}
}
_ = handler_interval.tick() => {
event_handler.tick().await;
}
_ = &mut stop_rx => {
debug!("Stop Location Manager event handler for location: <id='{}'>", location_id);
break
@ -158,32 +174,33 @@ impl LocationWatcher {
}
}
async fn handle_single_event(
async fn handle_single_event<'lib>(
location_id: LocationId,
location_pub_id: Uuid,
event: Event,
event_handler: &mut impl EventHandler,
library: &Library,
event_handler: &mut impl EventHandler<'lib>,
library: &'lib Library,
ignore_paths: &HashSet<PathBuf>,
) -> Result<(), LocationManagerError> {
if !check_event(&event, ignore_paths) {
return Ok(());
}
let Some(location) = find_location(library, location_id)
.include(location_with_indexer_rules::include())
.exec()
.await?
else {
warn!("Tried to handle event for unknown location: <id='{location_id}'>");
return Ok(());
};
// let Some(location) = find_location(library, location_id)
// .include(location_with_indexer_rules::include())
// .exec()
// .await?
// else {
// warn!("Tried to handle event for unknown location: <id='{location_id}'>");
// return Ok(());
// };
if !library.location_manager().is_online(&location.pub_id).await {
if !library.location_manager().is_online(&location_pub_id).await {
warn!("Tried to handle event for offline location: <id='{location_id}'>");
return Ok(());
}
event_handler.handle_event(location, library, event).await
event_handler.handle_event(event).await
}
pub(super) fn ignore_path(
@ -265,7 +282,7 @@ impl Drop for LocationWatcher {
/***************************************************************************************************
* Some tests to validate our assumptions of events through different file systems *
***************************************************************************************************
****************************************************************************************************
* Events dispatched on Linux: *
* Create File: *
* 1) EventKind::Create(CreateKind::File) *
@ -334,26 +351,30 @@ impl Drop for LocationWatcher {
* Events dispatched on iOS: *
* TODO *
* *
**************************************************************************************************/
***************************************************************************************************/
#[cfg(test)]
#[allow(unused)]
mod tests {
#[cfg(target_os = "macos")]
use notify::event::DataChange;
use notify::{
event::{AccessKind, AccessMode, CreateKind, ModifyKind, RemoveKind, RenameMode},
Config, Event, EventKind, RecommendedWatcher, Watcher,
};
use std::io::ErrorKind;
use std::{
io::ErrorKind,
path::{Path, PathBuf},
time::Duration,
};
use notify::{
event::{CreateKind, ModifyKind, RemoveKind, RenameMode},
Config, Event, EventKind, RecommendedWatcher, Watcher,
};
use tempfile::{tempdir, TempDir};
use tokio::{fs, io::AsyncWriteExt, sync::mpsc, time::sleep};
use tracing::{debug, error};
use tracing_test::traced_test;
#[cfg(target_os = "macos")]
use notify::event::DataChange;
#[cfg(target_os = "linux")]
use notify::event::{AccessKind, AccessMode};
async fn setup_watcher() -> (
TempDir,
RecommendedWatcher,

View file

@ -4,11 +4,13 @@ use crate::{
location::{
delete_directory,
file_path_helper::{
extract_materialized_path, file_path_with_object, get_existing_file_or_directory,
get_existing_file_path_with_object, get_parent_dir, MaterializedPath,
extract_materialized_path, file_path_with_object, filter_existing_file_path_params,
get_parent_dir, get_parent_dir_id, loose_find_existing_file_path_params, FilePathError,
MaterializedPath,
},
location_with_indexer_rules,
find_location, location_with_indexer_rules,
manager::LocationManagerError,
scan_location_sub_path, LocationId,
},
object::{
file_identifier::FileMetadata,
@ -18,105 +20,160 @@ use crate::{
},
validation::hash::file_checksum,
},
prisma::{file_path, object},
prisma::{file_path, location, object},
};
#[cfg(target_family = "unix")]
use crate::location::file_path_helper::get_inode_and_device;
#[cfg(target_family = "windows")]
use crate::location::file_path_helper::get_inode_and_device_from_path;
use std::{
collections::HashSet,
path::{Path, PathBuf},
fs::Metadata,
path::{Path, PathBuf, MAIN_SEPARATOR, MAIN_SEPARATOR_STR},
str::FromStr,
};
use chrono::{DateTime, FixedOffset, Local, Utc};
use int_enum::IntEnum;
use notify::{event::RemoveKind, Event};
use prisma_client_rust::{raw, PrismaValue};
use sd_file_ext::extensions::ImageExtension;
use chrono::{DateTime, Local};
use int_enum::IntEnum;
use notify::{Event, EventKind};
use prisma_client_rust::{raw, PrismaValue};
use tokio::{fs, io::ErrorKind};
use tracing::{error, info, trace, warn};
use uuid::Uuid;
use super::INodeAndDevice;
pub(super) fn check_event(event: &Event, ignore_paths: &HashSet<PathBuf>) -> bool {
// if path includes .DS_Store, .spacedrive or is in the `ignore_paths` set, we ignore
// if path includes .DS_Store, .spacedrive file creation or is in the `ignore_paths` set, we ignore
!event.paths.iter().any(|p| {
let path_str = p.to_str().expect("Found non-UTF-8 path");
path_str.contains(".DS_Store")
|| path_str.contains(".spacedrive")
|| (path_str.contains(".spacedrive") && matches!(event.kind, EventKind::Create(_)))
|| ignore_paths.contains(p)
})
}
pub(super) async fn create_dir(
location: &location_with_indexer_rules::Data,
event: &Event,
location_id: LocationId,
path: impl AsRef<Path>,
metadata: &Metadata,
library: &Library,
) -> Result<(), LocationManagerError> {
if location.node_id != library.node_local_id {
return Ok(());
}
let location = find_location(library, location_id)
.include(location_with_indexer_rules::include())
.exec()
.await?
.ok_or(LocationManagerError::MissingLocation(location_id))?;
let path = path.as_ref();
trace!(
"Location: <root_path ='{}'> creating directory: {}",
location.path,
event.paths[0].display()
path.display()
);
let materialized_path =
MaterializedPath::new(location.id, &location.path, &event.paths[0], true)?;
let materialized_path = MaterializedPath::new(location.id, &location.path, path, true)?;
let (inode, device) = {
#[cfg(target_family = "unix")]
{
get_inode_and_device(metadata)?
}
#[cfg(target_family = "windows")]
{
// FIXME: This is a workaround for Windows, because we can't get the inode and device from the metadata
let _ = metadata; // To avoid unused variable warning
get_inode_and_device_from_path(&path).await?
}
};
let parent_directory = get_parent_dir(&materialized_path, &library.db).await?;
trace!("parent_directory: {:?}", parent_directory);
let Some(parent_directory) = parent_directory else {
warn!("Watcher found a path without parent");
warn!("Watcher found a directory without parent");
return Ok(())
};
let created_path = library
.last_file_path_id_manager
.create_file_path(&library.db, materialized_path, Some(parent_directory.id))
.create_file_path(
&library.db,
materialized_path,
Some(parent_directory.id),
inode,
device,
)
.await?;
info!("Created path: {}", created_path.materialized_path);
// scan the new directory
scan_location_sub_path(library, location, &created_path.materialized_path).await;
invalidate_query!(library, "locations.getExplorerData");
Ok(())
}
pub(super) async fn create_file(
location: &location_with_indexer_rules::Data,
event: &Event,
location_id: LocationId,
path: impl AsRef<Path>,
metadata: &Metadata,
library: &Library,
) -> Result<(), LocationManagerError> {
if location.node_id != library.node_local_id {
return Ok(());
}
let full_path = &event.paths[0];
let path = path.as_ref();
let location_path = extract_location_path(location_id, library).await?;
trace!(
"Location: <root_path ='{}'> creating file: {}",
&location.path,
full_path.display()
location_path.display(),
path.display()
);
let db = &library.db;
let materialized_path = MaterializedPath::new(location.id, &location.path, full_path, false)?;
let materialized_path = MaterializedPath::new(location_id, &location_path, path, false)?;
let (inode, device) = {
#[cfg(target_family = "unix")]
{
get_inode_and_device(metadata)?
}
#[cfg(target_family = "windows")]
{
// FIXME: This is a workaround for Windows, because we can't get the inode and device from the metadata
let _ = metadata; // To avoid unused variable warning
get_inode_and_device_from_path(path).await?
}
};
let Some(parent_directory) =
get_parent_dir(&materialized_path, &library.db).await?
get_parent_dir(&materialized_path, db).await?
else {
warn!("Watcher found a path without parent");
warn!("Watcher found a file without parent");
return Ok(())
};
let created_file = library
.last_file_path_id_manager
.create_file_path(&library.db, materialized_path, Some(parent_directory.id))
.create_file_path(
db,
materialized_path,
Some(parent_directory.id),
inode,
device,
)
.await?;
info!("Created path: {}", created_file.materialized_path);
@ -126,32 +183,23 @@ pub(super) async fn create_file(
cas_id,
kind,
fs_metadata,
} = FileMetadata::new(&location.path, &created_file.materialized_path).await?;
} = FileMetadata::new(
&location_path,
&MaterializedPath::from((location_id, &created_file.materialized_path)),
)
.await?;
let existing_object = db
.object()
.find_first(vec![object::file_paths::some(vec![
file_path::cas_id::equals(Some(cas_id.clone())),
])])
.select(object_just_id_has_thumbnail::select())
.exec()
.await?;
let size_str = fs_metadata.len().to_string();
let object = if let Some(object) = existing_object {
db.object()
.update(
object::id::equals(object.id),
vec![
object::size_in_bytes::set(size_str),
object::date_indexed::set(
Utc::now().with_timezone(&FixedOffset::east_opt(0).unwrap()),
),
],
)
.select(object_just_id_has_thumbnail::select())
.exec()
.await?
object
} else {
db.object()
.create(
@ -161,7 +209,6 @@ pub(super) async fn create_file(
DateTime::<Local>::from(fs_metadata.created().unwrap()).into(),
),
object::kind::set(kind.int_value()),
object::size_in_bytes::set(size_str.clone()),
],
)
.select(object_just_id_has_thumbnail::select())
@ -171,15 +218,14 @@ pub(super) async fn create_file(
db.file_path()
.update(
file_path::location_id_id(location.id, created_file.id),
file_path::location_id_id(location_id, created_file.id),
vec![file_path::object_id::set(Some(object.id))],
)
.exec()
.await?;
trace!("object: {:#?}", object);
if !object.has_thumbnail && !created_file.extension.is_empty() {
generate_thumbnail(&created_file.extension, &cas_id, &event.paths[0], library).await;
generate_thumbnail(&created_file.extension, &cas_id, path, library).await;
}
invalidate_query!(library, "locations.getExplorerData");
@ -187,66 +233,109 @@ pub(super) async fn create_file(
Ok(())
}
pub(super) async fn create_dir_or_file(
location_id: LocationId,
path: impl AsRef<Path>,
library: &Library,
) -> Result<Metadata, LocationManagerError> {
let metadata = fs::metadata(path.as_ref()).await?;
if metadata.is_dir() {
create_dir(location_id, path, &metadata, library).await
} else {
create_file(location_id, path, &metadata, library).await
}
.map(|_| metadata)
}
pub(super) async fn file_creation_or_update(
location: &location_with_indexer_rules::Data,
event: &Event,
location_id: LocationId,
full_path: impl AsRef<Path>,
library: &Library,
) -> Result<(), LocationManagerError> {
if let Some(ref file_path) = get_existing_file_path_with_object(
MaterializedPath::new(location.id, &location.path, &event.paths[0], false)?,
&library.db,
)
.await?
let full_path = full_path.as_ref();
let location_path = extract_location_path(location_id, library).await?;
if let Some(ref file_path) = library
.db
.file_path()
.find_first(filter_existing_file_path_params(&MaterializedPath::new(
location_id,
&location_path,
full_path,
false,
)?))
// include object for orphan check
.include(file_path_with_object::include())
.exec()
.await?
{
inner_update_file(location, file_path, event, library).await
inner_update_file(location_id, file_path, full_path, library).await
} else {
// We received None because it is a new file
create_file(location, event, library).await
create_file(
location_id,
full_path,
&fs::metadata(full_path).await?,
library,
)
.await
}
}
pub(super) async fn update_file(
location: &location_with_indexer_rules::Data,
event: &Event,
location_id: LocationId,
full_path: impl AsRef<Path>,
library: &Library,
) -> Result<(), LocationManagerError> {
if location.node_id == library.node_local_id {
if let Some(ref file_path) = get_existing_file_path_with_object(
MaterializedPath::new(location.id, &location.path, &event.paths[0], false)?,
&library.db,
)
let full_path = full_path.as_ref();
let location_path = extract_location_path(location_id, library).await?;
if let Some(ref file_path) = library
.db
.file_path()
.find_first(filter_existing_file_path_params(&MaterializedPath::new(
location_id,
&location_path,
full_path,
false,
)?))
// include object for orphan check
.include(file_path_with_object::include())
.exec()
.await?
{
let ret = inner_update_file(location, file_path, event, library).await;
invalidate_query!(library, "locations.getExplorerData");
ret
} else {
Err(LocationManagerError::UpdateNonExistingFile(
event.paths[0].clone(),
))
}
{
let ret = inner_update_file(location_id, file_path, full_path, library).await;
invalidate_query!(library, "locations.getExplorerData");
ret
} else {
Err(LocationManagerError::LocationMissingLocalPath(location.id))
Err(LocationManagerError::UpdateNonExistingFile(
full_path.to_path_buf(),
))
}
}
async fn inner_update_file(
location: &location_with_indexer_rules::Data,
location_id: LocationId,
file_path: &file_path_with_object::Data,
event: &Event,
full_path: impl AsRef<Path>,
library: &Library,
) -> Result<(), LocationManagerError> {
let full_path = full_path.as_ref();
let location_path = extract_location_path(location_id, library).await?;
trace!(
"Location: <root_path ='{}'> updating file: {}",
&location.path,
event.paths[0].display()
location_path.display(),
full_path.display()
);
let FileMetadata {
cas_id,
fs_metadata,
..
} = FileMetadata::new(&location.path, &file_path.materialized_path).await?;
kind,
} = FileMetadata::new(
&location_path,
&MaterializedPath::from((location_id, &file_path.materialized_path)),
)
.await?;
if let Some(old_cas_id) = &file_path.cas_id {
if old_cas_id != &cas_id {
@ -255,18 +344,17 @@ async fn inner_update_file(
.db
.file_path()
.update(
file_path::location_id_id(location.id, file_path.id),
file_path::location_id_id(location_id, file_path.id),
vec![
file_path::cas_id::set(Some(old_cas_id.clone())),
// file_path::size_in_bytes::set(fs_metadata.len().to_string()),
// file_path::kind::set(kind.int_value()),
file_path::size_in_bytes::set(fs_metadata.len().to_string()),
file_path::date_modified::set(
DateTime::<Local>::from(fs_metadata.created().unwrap()).into(),
),
file_path::integrity_checksum::set(
if file_path.integrity_checksum.is_some() {
// If a checksum was already computed, we need to recompute it
Some(file_checksum(&event.paths[0]).await?)
Some(file_checksum(full_path).await?)
} else {
None
},
@ -276,105 +364,149 @@ async fn inner_update_file(
.exec()
.await?;
if file_path
.object
.as_ref()
.map(|o| o.has_thumbnail)
.unwrap_or_default()
{
if let Some(ref object) = file_path.object {
// if this file had a thumbnail previously, we update it to match the new content
if !file_path.extension.is_empty() {
generate_thumbnail(&file_path.extension, &cas_id, &event.paths[0], library)
.await;
if object.has_thumbnail && !file_path.extension.is_empty() {
generate_thumbnail(&file_path.extension, &cas_id, full_path, library).await;
}
let int_kind = kind.int_value();
if object.kind != int_kind {
library
.db
.object()
.update(
object::id::equals(object.id),
vec![object::kind::set(int_kind)],
)
.exec()
.await?;
}
}
invalidate_query!(library, "locations.getExplorerData");
}
}
invalidate_query!(library, "locations.getExplorerData");
Ok(())
}
pub(super) async fn rename_both_event(
location: &location_with_indexer_rules::Data,
event: &Event,
library: &Library,
) -> Result<(), LocationManagerError> {
rename(&event.paths[1], &event.paths[0], location, library).await
}
pub(super) async fn rename(
location_id: LocationId,
new_path: impl AsRef<Path>,
old_path: impl AsRef<Path>,
location: &location_with_indexer_rules::Data,
library: &Library,
) -> Result<(), LocationManagerError> {
let mut old_path_materialized =
extract_materialized_path(location.id, &location.path, old_path.as_ref())?
let location_path = extract_location_path(location_id, library).await?;
let old_path_materialized =
extract_materialized_path(location_id, &location_path, old_path.as_ref())?;
let mut old_path_materialized_str = format!(
"{MAIN_SEPARATOR_STR}{}",
old_path_materialized
.to_str()
.expect("Found non-UTF-8 path")
.to_string();
);
let new_path_materialized =
extract_materialized_path(location.id, &location.path, new_path.as_ref())?;
let mut new_path_materialized_str = new_path_materialized
.to_str()
.expect("Found non-UTF-8 path")
.to_string();
extract_materialized_path(location_id, &location_path, new_path.as_ref())?;
let mut new_path_materialized_str = format!(
"{MAIN_SEPARATOR_STR}{}",
new_path_materialized
.to_str()
.expect("Found non-UTF-8 path")
);
if let Some(file_path) = get_existing_file_or_directory(location, old_path, &library.db).await?
let old_materialized_path_parent = old_path_materialized
.parent()
.unwrap_or_else(|| Path::new(MAIN_SEPARATOR_STR));
let new_materialized_path_parent = new_path_materialized
.parent()
.unwrap_or_else(|| Path::new(MAIN_SEPARATOR_STR));
// Renaming a file could potentially be a move to another directory, so we check if our parent changed
let changed_parent_id = if old_materialized_path_parent != new_materialized_path_parent {
Some(
get_parent_dir_id(
&MaterializedPath::new(
location_id,
&location_path,
new_path,
true,
)?,
&library.db,
)
.await?
.expect("CRITICAL ERROR: If we're puting a file in a directory inside our location, then this directory must exist"),
)
} else {
None
};
if let Some(file_path) = library
.db
.file_path()
.find_first(loose_find_existing_file_path_params(
&MaterializedPath::new(location_id, &location_path, old_path, true)?,
))
.exec()
.await?
{
// If the renamed path is a directory, we have to update every successor
if file_path.is_dir {
if !old_path_materialized.ends_with('/') {
old_path_materialized += "/";
if !old_path_materialized_str.ends_with(MAIN_SEPARATOR) {
old_path_materialized_str += MAIN_SEPARATOR_STR;
}
if !new_path_materialized_str.ends_with('/') {
new_path_materialized_str += "/";
if !new_path_materialized_str.ends_with(MAIN_SEPARATOR) {
new_path_materialized_str += MAIN_SEPARATOR_STR;
}
let updated = library
.db
._execute_raw(
raw!(
"UPDATE file_path SET materialized_path = REPLACE(materialized_path, {}, {}) WHERE location_id = {}",
PrismaValue::String(old_path_materialized),
PrismaValue::String(new_path_materialized_str.clone()),
PrismaValue::Int(location.id as i64)
)
)
._execute_raw(raw!(
"UPDATE file_path \
SET materialized_path = REPLACE(materialized_path, {}, {}) \
WHERE location_id = {}",
PrismaValue::String(old_path_materialized_str.clone()),
PrismaValue::String(new_path_materialized_str.clone()),
PrismaValue::Int(location_id as i64)
))
.exec()
.await?;
trace!("Updated {updated} file_paths");
}
let mut update_params = vec![
file_path::materialized_path::set(new_path_materialized_str),
file_path::name::set(
new_path_materialized
.file_stem()
.unwrap()
.to_str()
.expect("Found non-UTF-8 path")
.to_string(),
),
file_path::extension::set(
new_path_materialized
.extension()
.map(|s| {
s.to_str()
.expect("Found non-UTF-8 extension in path")
.to_string()
})
.unwrap_or_default(),
),
];
if changed_parent_id.is_some() {
update_params.push(file_path::parent_id::set(changed_parent_id));
}
library
.db
.file_path()
.update(
file_path::location_id_id(file_path.location_id, file_path.id),
vec![
file_path::materialized_path::set(new_path_materialized_str),
file_path::name::set(
new_path_materialized
.file_stem()
.unwrap()
.to_str()
.expect("Found non-UTF-8 path")
.to_string(),
),
file_path::extension::set(
new_path_materialized
.extension()
.map(|s| {
s.to_str()
.expect("Found non-UTF-8 extension in path")
.to_string()
})
.unwrap_or_default(),
),
],
update_params,
)
.exec()
.await?;
@ -384,66 +516,91 @@ pub(super) async fn rename(
Ok(())
}
pub(super) async fn remove_event(
location: &location_with_indexer_rules::Data,
event: &Event,
remove_kind: RemoveKind,
pub(super) async fn remove(
location_id: LocationId,
full_path: impl AsRef<Path>,
library: &Library,
) -> Result<(), LocationManagerError> {
trace!("removed {remove_kind:#?}");
let full_path = full_path.as_ref();
let location_path = extract_location_path(location_id, library).await?;
// if it doesn't either way, then we don't care
if let Some(file_path) =
get_existing_file_or_directory(location, &event.paths[0], &library.db).await?
{
// check file still exists on disk
match fs::metadata(&event.paths[0]).await {
Ok(_) => {
todo!("file has changed in some way, re-identify it")
}
Err(e) if e.kind() == ErrorKind::NotFound => {
// if is doesn't, we can remove it safely from our db
if file_path.is_dir {
delete_directory(library, location.id, Some(file_path.materialized_path))
.await?;
} else {
// if it doesn't exist either way, then we don't care
let Some(file_path) = library.db
.file_path()
.find_first(loose_find_existing_file_path_params(
&MaterializedPath::new(location_id, &location_path, full_path, true)?,
))
.exec()
.await? else {
return Ok(());
};
remove_by_file_path(location_id, full_path, &file_path, library).await
}
pub(super) async fn remove_by_file_path(
location_id: LocationId,
path: impl AsRef<Path>,
file_path: &file_path::Data,
library: &Library,
) -> Result<(), LocationManagerError> {
// check file still exists on disk
match fs::metadata(path).await {
Ok(_) => {
todo!("file has changed in some way, re-identify it")
}
Err(e) if e.kind() == ErrorKind::NotFound => {
// if is doesn't, we can remove it safely from our db
if file_path.is_dir {
delete_directory(
library,
location_id,
Some(file_path.materialized_path.clone()),
)
.await?;
} else {
library
.db
.file_path()
.delete(file_path::location_id_id(location_id, file_path.id))
.exec()
.await?;
if let Some(object_id) = file_path.object_id {
library
.db
.file_path()
.delete(file_path::location_id_id(location.id, file_path.id))
.object()
.delete_many(vec![
object::id::equals(object_id),
// https://www.prisma.io/docs/reference/api-reference/prisma-client-reference#none
object::file_paths::none(vec![]),
])
.exec()
.await?;
if let Some(object_id) = file_path.object_id {
library
.db
.object()
.delete_many(vec![
object::id::equals(object_id),
// https://www.prisma.io/docs/reference/api-reference/prisma-client-reference#none
object::file_paths::none(vec![]),
])
.exec()
.await?;
}
}
}
Err(e) => return Err(e.into()),
}
invalidate_query!(library, "locations.getExplorerData");
Err(e) => return Err(e.into()),
}
// If the file paths we just removed were the last ids in the DB, we decresed the last id from the id manager
library
.last_file_path_id_manager
.sync(location_id, &library.db)
.await?;
invalidate_query!(library, "locations.getExplorerData");
Ok(())
}
async fn generate_thumbnail(
extension: &str,
cas_id: &str,
file_path: impl AsRef<Path>,
path: impl AsRef<Path>,
library: &Library,
) {
let file_path = file_path.as_ref();
let path = path.as_ref();
let output_path = library
.config()
.data_directory()
@ -453,7 +610,7 @@ async fn generate_thumbnail(
if let Ok(extension) = ImageExtension::from_str(extension) {
if can_generate_thumbnail_for_image(&extension) {
if let Err(e) = generate_image_thumbnail(file_path, &output_path).await {
if let Err(e) = generate_image_thumbnail(path, &output_path).await {
error!("Failed to image thumbnail on location manager: {e:#?}");
}
}
@ -466,10 +623,55 @@ async fn generate_thumbnail(
if let Ok(extension) = VideoExtension::from_str(extension) {
if can_generate_thumbnail_for_video(&extension) {
if let Err(e) = generate_video_thumbnail(file_path, &output_path).await {
if let Err(e) = generate_video_thumbnail(path, &output_path).await {
error!("Failed to video thumbnail on location manager: {e:#?}");
}
}
}
}
}
pub(super) async fn extract_inode_and_device_from_path(
location_id: LocationId,
path: impl AsRef<Path>,
library: &Library,
) -> Result<INodeAndDevice, LocationManagerError> {
let path = path.as_ref();
let location = find_location(library, location_id)
.select(location::select!({ path }))
.exec()
.await?
.ok_or(LocationManagerError::MissingLocation(location_id))?;
library
.db
.file_path()
.find_first(loose_find_existing_file_path_params(
&MaterializedPath::new(location_id, &location.path, path, true)?,
))
.select(file_path::select!( {inode device} ))
.exec()
.await?
.map(|file_path| {
(
u64::from_le_bytes(file_path.inode[0..8].try_into().unwrap()),
u64::from_le_bytes(file_path.device[0..8].try_into().unwrap()),
)
})
.ok_or_else(|| FilePathError::NotFound(path.to_path_buf()).into())
}
pub(super) async fn extract_location_path(
location_id: LocationId,
library: &Library,
) -> Result<PathBuf, LocationManagerError> {
find_location(library, location_id)
.select(location::select!({ path }))
.exec()
.await?
.map_or(
Err(LocationManagerError::MissingLocation(location_id)),
// NOTE: The following usage of `PathBuf` doesn't incur a new allocation so it's fine
|location| Ok(PathBuf::from(location.path)),
)
}

View file

@ -1,6 +1,23 @@
//! Windows file system event handler implementation has some caveats die to how
//! file system events are emitted on Windows.
//!
//! For example: When a file is moved to another
//! directory, we receive a remove event and then a create event, so to avoid having to actually
//! remove and create the `file_path` in the database, we have to wait some time after receiving
//! a remove event to see if a create event is emitted. If it is, we just update the `file_path`
//! in the database. If not, we remove the file from the database.
use crate::{
invalidate_query,
library::Library,
location::{location_with_indexer_rules, manager::LocationManagerError},
location::{
file_path_helper::get_inode_and_device_from_path, manager::LocationManagerError, LocationId,
},
};
use std::{
collections::{BTreeMap, HashMap},
path::PathBuf,
};
use async_trait::async_trait;
@ -8,70 +25,126 @@ use notify::{
event::{CreateKind, ModifyKind, RenameMode},
Event, EventKind,
};
use tokio::fs;
use tracing::{trace, warn};
use tokio::{fs, time::Instant};
use tracing::{error, trace};
use super::{
utils::{create_dir, create_file, remove_event, rename, update_file},
EventHandler,
utils::{create_dir_or_file, extract_inode_and_device_from_path, remove, rename, update_file},
EventHandler, INodeAndDevice, InstantAndPath, HUNDRED_MILLIS, ONE_SECOND,
};
#[derive(Debug, Default)]
pub(super) struct WindowsEventHandler {
rename_stack: Option<Event>,
create_file_stack: Option<Event>,
/// Windows file system event handler
#[derive(Debug)]
pub(super) struct WindowsEventHandler<'lib> {
location_id: LocationId,
library: &'lib Library,
last_check_recently_files: Instant,
recently_created_files: BTreeMap<PathBuf, Instant>,
last_check_rename_and_remove: Instant,
rename_from_map: BTreeMap<INodeAndDevice, InstantAndPath>,
rename_to_map: BTreeMap<INodeAndDevice, InstantAndPath>,
to_remove_files: HashMap<INodeAndDevice, InstantAndPath>,
removal_buffer: Vec<(INodeAndDevice, InstantAndPath)>,
}
#[async_trait]
impl EventHandler for WindowsEventHandler {
fn new() -> Self
impl<'lib> EventHandler<'lib> for WindowsEventHandler<'lib> {
fn new(location_id: LocationId, library: &'lib Library) -> Self
where
Self: Sized,
{
Default::default()
Self {
location_id,
library,
last_check_recently_files: Instant::now(),
recently_created_files: BTreeMap::new(),
last_check_rename_and_remove: Instant::now(),
rename_from_map: BTreeMap::new(),
rename_to_map: BTreeMap::new(),
to_remove_files: HashMap::new(),
removal_buffer: Vec::new(),
}
}
async fn handle_event(
&mut self,
location: location_with_indexer_rules::Data,
library: &Library,
event: Event,
) -> Result<(), LocationManagerError> {
async fn handle_event(&mut self, event: Event) -> Result<(), LocationManagerError> {
trace!("Received Windows event: {:#?}", event);
let Event {
kind, mut paths, ..
} = event;
match event.kind {
match kind {
EventKind::Create(CreateKind::Any) => {
let metadata = fs::metadata(&event.paths[0]).await?;
if metadata.is_file() {
self.create_file_stack = Some(event);
let inode_and_device = get_inode_and_device_from_path(&paths[0]).await?;
if let Some((_, old_path)) = self.to_remove_files.remove(&inode_and_device) {
// if previously we added a file to be removed with the same inode and device
// of this "newly created" created file, it means that the file was just moved to another location
// so we can treat if just as a file rename, like in other OSes
trace!(
"Got a rename instead of remove/create: {} -> {}",
old_path.display(),
paths[0].display(),
);
// We found a new path for this old path, so we can rename it instead of removing and creating it
rename(self.location_id, &paths[0], &old_path, self.library).await?;
} else {
create_dir(&location, &event, library).await?;
let metadata =
create_dir_or_file(self.location_id, &paths[0], self.library).await?;
if metadata.is_file() {
self.recently_created_files
.insert(paths.remove(0), Instant::now());
}
}
}
EventKind::Modify(ModifyKind::Any) => {
let metadata = fs::metadata(&event.paths[0]).await?;
if metadata.is_file() {
if let Some(create_file_event) = self.create_file_stack.take() {
create_file(&location, &create_file_event, library).await?;
} else {
update_file(&location, &event, library).await?;
// Windows emite events of update right after create events
if !self.recently_created_files.contains_key(&paths[0]) {
let metadata = fs::metadata(&paths[0]).await?;
if metadata.is_file() {
update_file(self.location_id, &paths[0], self.library).await?;
}
} else {
warn!("Unexpected Windows modify event on a directory");
}
}
EventKind::Modify(ModifyKind::Name(RenameMode::From)) => {
self.rename_stack = Some(event);
let path = paths.remove(0);
let inode_and_device =
extract_inode_and_device_from_path(self.location_id, &path, self.library)
.await?;
if let Some((_, new_path)) = self.rename_to_map.remove(&inode_and_device) {
// We found a new path for this old path, so we can rename it
rename(self.location_id, &new_path, &path, self.library).await?;
} else {
self.rename_from_map
.insert(inode_and_device, (Instant::now(), path));
}
}
EventKind::Modify(ModifyKind::Name(RenameMode::To)) => {
let from_event = self
.rename_stack
.take()
.expect("Unexpectedly missing rename from windows event");
rename(&event.paths[0], &from_event.paths[0], &location, library).await?;
let path = paths.remove(0);
let inode_and_device =
extract_inode_and_device_from_path(self.location_id, &path, self.library)
.await?;
if let Some((_, old_path)) = self.rename_to_map.remove(&inode_and_device) {
// We found a old path for this new path, so we can rename it
rename(self.location_id, &path, &old_path, self.library).await?;
} else {
self.rename_from_map
.insert(inode_and_device, (Instant::now(), path));
}
}
EventKind::Remove(remove_kind) => {
remove_event(&location, &event, remove_kind, library).await?;
EventKind::Remove(_) => {
let path = paths.remove(0);
self.to_remove_files.insert(
extract_inode_and_device_from_path(self.location_id, &path, self.library)
.await?,
(Instant::now(), path),
);
}
other_event_kind => {
@ -81,4 +154,56 @@ impl EventHandler for WindowsEventHandler {
Ok(())
}
async fn tick(&mut self) {
// Cleaning out recently created files that are older than 1 second
if self.last_check_recently_files.elapsed() > ONE_SECOND {
self.last_check_recently_files = Instant::now();
self.recently_created_files
.retain(|_, created_at| created_at.elapsed() < ONE_SECOND);
}
if self.last_check_rename_and_remove.elapsed() > HUNDRED_MILLIS {
self.last_check_rename_and_remove = Instant::now();
self.rename_from_map.retain(|_, (created_at, path)| {
let to_retain = created_at.elapsed() < HUNDRED_MILLIS;
if !to_retain {
trace!("Removing from rename from map: {:#?}", path.display())
}
to_retain
});
self.rename_to_map.retain(|_, (created_at, path)| {
let to_retain = created_at.elapsed() < HUNDRED_MILLIS;
if !to_retain {
trace!("Removing from rename to map: {:#?}", path.display())
}
to_retain
});
self.handle_removes_eviction().await;
}
}
}
impl WindowsEventHandler<'_> {
async fn handle_removes_eviction(&mut self) {
self.removal_buffer.clear();
for (inode_and_device, (instant, path)) in self.to_remove_files.drain() {
if instant.elapsed() > HUNDRED_MILLIS {
if let Err(e) = remove(self.location_id, &path, self.library).await {
error!("Failed to remove file_path: {e}");
} else {
trace!("Removed file_path due timeout: {}", path.display());
invalidate_query!(self.library, "locations.getExplorerData");
}
} else {
self.removal_buffer
.push((inode_and_device, (instant, path)));
}
}
for (key, value) in self.removal_buffer.drain(..) {
self.to_remove_files.insert(key, value);
}
}
}

View file

@ -359,15 +359,15 @@ pub async fn scan_location(
Ok(())
}
#[allow(dead_code)]
#[cfg(feature = "location-watcher")]
pub async fn scan_location_sub_path(
library: &Library,
location: location_with_indexer_rules::Data,
sub_path: impl AsRef<Path>,
) -> Result<(), LocationError> {
) {
let sub_path = sub_path.as_ref().to_path_buf();
if location.node_id != library.node_local_id {
return Ok(());
return;
}
library
@ -400,8 +400,6 @@ pub async fn scan_location_sub_path(
IndexerJob {},
))
.await;
Ok(())
}
pub async fn light_scan_location(

View file

@ -1,44 +1,59 @@
use blake3::Hasher;
use std::path::Path;
use blake3::Hasher;
use static_assertions::const_assert;
use tokio::{
fs::File,
fs::{self, File},
io::{self, AsyncReadExt, AsyncSeekExt, SeekFrom},
};
static SAMPLE_COUNT: u64 = 4;
static SAMPLE_SIZE: u64 = 10000;
const SAMPLE_COUNT: u64 = 4;
const SAMPLE_SIZE: u64 = 1024 * 10;
const HEADER_OR_FOOTER_SIZE: u64 = 1024 * 8;
async fn read_at(file: &mut File, offset: u64, size: u64) -> Result<Vec<u8>, io::Error> {
let mut buf = vec![0u8; size as usize];
// minimum file size of 100KiB, to avoid sample hashing for small files as they can be smaller than the total sample size
const MINIMUM_FILE_SIZE: u64 = 1024 * 100;
file.seek(SeekFrom::Start(offset)).await?;
file.read_exact(&mut buf).await?;
Ok(buf)
}
// Asserting that nobody messed up our consts
const_assert!(
HEADER_OR_FOOTER_SIZE + SAMPLE_COUNT * SAMPLE_SIZE + HEADER_OR_FOOTER_SIZE < MINIMUM_FILE_SIZE
);
pub async fn generate_cas_id(path: impl AsRef<Path>, size: u64) -> Result<String, io::Error> {
let mut file = File::open(path).await?;
let mut hasher = Hasher::new();
hasher.update(&size.to_le_bytes());
let sample_interval = if SAMPLE_COUNT * SAMPLE_SIZE > size {
size
if size <= MINIMUM_FILE_SIZE {
// For small files, we hash the whole file
fs::read(path).await.map(|buf| {
hasher.update(&buf);
})?;
} else {
size / SAMPLE_COUNT
};
let mut file = File::open(path).await?;
let mut buf = vec![0; SAMPLE_SIZE as usize].into_boxed_slice();
for i in 0..=SAMPLE_COUNT {
let offset = if i == SAMPLE_COUNT {
size - SAMPLE_SIZE
} else {
sample_interval * i
};
let buf = read_at(&mut file, offset, SAMPLE_SIZE).await?;
// Hashing the header
file.read_exact(&mut buf[..HEADER_OR_FOOTER_SIZE as usize])
.await?;
hasher.update(&buf);
// Sample hashing the inner content of the file
for _ in 0..SAMPLE_COUNT {
file.seek(SeekFrom::Current(
((size - HEADER_OR_FOOTER_SIZE * 2) / SAMPLE_COUNT) as i64,
))
.await?;
file.read_exact(&mut buf).await?;
hasher.update(&buf);
}
// Hashing the footer
file.seek(SeekFrom::End(-(HEADER_OR_FOOTER_SIZE as i64)))
.await?;
file.read_exact(&mut buf[..HEADER_OR_FOOTER_SIZE as usize])
.await?;
hasher.update(&buf);
}
let mut id = hasher.finalize().to_hex();
id.truncate(16);
Ok(id.to_string())
Ok(hasher.finalize().to_hex()[..16].to_string())
}

View file

@ -50,7 +50,7 @@ impl Hash for FileIdentifierJobInit {
pub struct FileIdentifierJobState {
cursor: FilePathIdAndLocationIdCursor,
report: FileIdentifierReport,
maybe_sub_materialized_path: Option<MaterializedPath>,
maybe_sub_materialized_path: Option<MaterializedPath<'static>>,
}
#[async_trait::async_trait]
@ -192,7 +192,7 @@ impl StatefulJob for FileIdentifierJob {
fn orphan_path_filters(
location_id: i32,
file_path_id: Option<i32>,
maybe_sub_materialized_path: &Option<MaterializedPath>,
maybe_sub_materialized_path: &Option<MaterializedPath<'_>>,
) -> Vec<file_path::WhereParam> {
let mut params = vec![
file_path::object_id::equals(None),
@ -216,7 +216,7 @@ fn orphan_path_filters(
async fn count_orphan_file_paths(
db: &PrismaClient,
location_id: i32,
maybe_sub_materialized_path: &Option<MaterializedPath>,
maybe_sub_materialized_path: &Option<MaterializedPath<'_>>,
) -> Result<usize, prisma_client_rust::QueryError> {
db.file_path()
.count(orphan_path_filters(
@ -232,7 +232,7 @@ async fn count_orphan_file_paths(
async fn get_orphan_file_paths(
db: &PrismaClient,
cursor: &FilePathIdAndLocationIdCursor,
maybe_sub_materialized_path: &Option<MaterializedPath>,
maybe_sub_materialized_path: &Option<MaterializedPath<'_>>,
) -> Result<Vec<file_path_for_file_identifier::Data>, prisma_client_rust::QueryError> {
info!(
"Querying {} orphan Paths at cursor: {:?}",

View file

@ -2,7 +2,7 @@ use crate::{
invalidate_query,
job::{JobError, JobReportUpdate, JobResult, WorkerContext},
library::Library,
location::file_path_helper::{file_path_for_file_identifier, FilePathError},
location::file_path_helper::{file_path_for_file_identifier, FilePathError, MaterializedPath},
object::{cas::generate_cas_id, object_for_file_identifier},
prisma::{file_path, location, object, PrismaClient},
sync,
@ -48,9 +48,9 @@ impl FileMetadata {
/// Assembles `create_unchecked` params for a given file path
pub async fn new(
location_path: impl AsRef<Path>,
materialized_path: impl AsRef<Path>, // TODO: use dedicated CreateUnchecked type
materialized_path: &MaterializedPath<'_>, // TODO: use dedicated CreateUnchecked type
) -> Result<FileMetadata, io::Error> {
let path = location_path.as_ref().join(materialized_path.as_ref());
let path = location_path.as_ref().join(materialized_path);
let fs_metadata = fs::metadata(&path).await?;
@ -67,7 +67,7 @@ impl FileMetadata {
let cas_id = generate_cas_id(&path, fs_metadata.len()).await?;
info!("Analyzed file: {:?} {:?} {:?}", path, cas_id, kind);
info!("Analyzed file: {path:?} {cas_id:?} {kind:?}");
Ok(FileMetadata {
cas_id,
@ -104,9 +104,13 @@ async fn identifier_job_step(
file_paths: &[file_path_for_file_identifier::Data],
) -> Result<(usize, usize), JobError> {
let file_path_metas = join_all(file_paths.iter().map(|file_path| async move {
FileMetadata::new(&location.path, &file_path.materialized_path)
.await
.map(|params| (file_path.id, (params, file_path)))
// NOTE: `file_path`'s `materialized_path` begins with a `/` character so we remove it to join it with `location.path`
FileMetadata::new(
&location.path,
&MaterializedPath::from((location.id, &file_path.materialized_path)),
)
.await
.map(|params| (file_path.id, (params, file_path)))
}))
.await
.into_iter()
@ -256,7 +260,6 @@ async fn identifier_job_step(
vec![
object::date_created::set(fp.date_created),
object::kind::set(kind),
object::size_in_bytes::set(size),
],
),
);

View file

@ -77,7 +77,7 @@ impl StatefulJob for ShallowFileIdentifierJob {
.map_err(FileIdentifierJobError::from)?;
get_existing_file_path_id(
MaterializedPath::new(location_id, location_path, &full_path, true)
&MaterializedPath::new(location_id, location_path, &full_path, true)
.map_err(FileIdentifierJobError::from)?,
db,
)
@ -86,7 +86,7 @@ impl StatefulJob for ShallowFileIdentifierJob {
.expect("Sub path should already exist in the database")
} else {
get_existing_file_path_id(
MaterializedPath::new(location_id, location_path, location_path, true)
&MaterializedPath::new(location_id, location_path, location_path, true)
.map_err(FileIdentifierJobError::from)?,
db,
)

View file

@ -41,7 +41,6 @@ pub struct Metadata {
pub important: bool,
pub note: Option<String>,
pub date_created: chrono::DateTime<FixedOffset>,
pub date_modified: chrono::DateTime<FixedOffset>,
}
const JOB_NAME: &str = "file_encryptor";
@ -148,7 +147,7 @@ impl StatefulJob for FileEncryptorJob {
if state.init.metadata || state.init.preview_media {
// if any are requested, we can make the query as it'll be used at least once
if let Some(object) = info.path_data.object.clone() {
if let Some(ref object) = info.path_data.object {
if state.init.metadata {
let metadata = Metadata {
path_id: state.init.path_id,
@ -156,9 +155,8 @@ impl StatefulJob for FileEncryptorJob {
hidden: object.hidden,
favorite: object.favorite,
important: object.important,
note: object.note,
note: object.note.clone(),
date_created: object.date_created,
date_modified: object.date_modified,
};
header

View file

@ -1,6 +1,6 @@
use crate::{
job::JobError,
location::file_path_helper::file_path_with_object,
location::file_path_helper::{file_path_with_object, MaterializedPath},
prisma::{file_path, location, PrismaClient},
};
@ -76,7 +76,10 @@ pub async fn context_menu_fs_info(
Ok(FsInfo {
fs_path: get_path_from_location_id(db, location_id)
.await?
.join(&path_data.materialized_path),
.join(&MaterializedPath::from((
location_id,
&path_data.materialized_path,
))),
path_data,
})
}

View file

@ -3,7 +3,9 @@ use crate::{
invalidate_query,
job::{JobError, JobReportUpdate, JobResult, WorkerContext},
location::{
file_path_helper::{file_path_just_materialized_path_cas_id, FilePathError},
file_path_helper::{
file_path_just_materialized_path_cas_id, FilePathError, MaterializedPath,
},
LocationId,
},
};
@ -148,7 +150,10 @@ fn finalize_thumbnailer(data: &ThumbnailerJobState, ctx: WorkerContext) -> JobRe
"Finished thumbnail generation for location {} at {}",
data.report.location_id,
data.location_path
.join(&data.report.materialized_path)
.join(&MaterializedPath::from((
data.report.location_id,
&data.report.materialized_path
)))
.display()
);
@ -185,7 +190,10 @@ async fn inner_process_step(
ctx: &WorkerContext,
) -> Result<(), JobError> {
// assemble the file path
let path = data.location_path.join(&step.file_path.materialized_path);
let path = data.location_path.join(&MaterializedPath::from((
data.report.location_id,
&step.file_path.materialized_path,
)));
trace!("image_file {:?}", step);
// get cas_id, if none found skip

View file

@ -14,7 +14,7 @@ use crate::{
use std::{
collections::VecDeque,
hash::Hash,
path::{Path, PathBuf},
path::{Path, PathBuf, MAIN_SEPARATOR_STR},
};
use sd_file_ext::extensions::Extension;
@ -80,7 +80,7 @@ impl StatefulJob for ShallowThumbnailerJob {
.map_err(ThumbnailerError::from)?;
get_existing_file_path_id(
MaterializedPath::new(location_id, &location_path, &full_path, true)
&MaterializedPath::new(location_id, &location_path, &full_path, true)
.map_err(ThumbnailerError::from)?,
db,
)
@ -89,7 +89,7 @@ impl StatefulJob for ShallowThumbnailerJob {
.expect("Sub path should already exist in the database")
} else {
get_existing_file_path_id(
MaterializedPath::new(location_id, &location_path, &location_path, true)
&MaterializedPath::new(location_id, &location_path, &location_path, true)
.map_err(ThumbnailerError::from)?,
db,
)
@ -149,7 +149,7 @@ impl StatefulJob for ShallowThumbnailerJob {
// SAFETY: We know that the sub_path is a valid UTF-8 string because we validated it before
state.init.sub_path.to_str().unwrap().to_string()
} else {
"".to_string()
MAIN_SEPARATOR_STR.to_string()
},
thumbnails_created: 0,
},

View file

@ -167,7 +167,7 @@ impl StatefulJob for ThumbnailerJob {
async fn get_files_by_extensions(
db: &PrismaClient,
materialized_path: &MaterializedPath,
materialized_path: &MaterializedPath<'_>,
extensions: &[Extension],
kind: ThumbnailerJobStepKind,
) -> Result<Vec<ThumbnailerJobStep>, JobError> {

View file

@ -1,7 +1,7 @@
use crate::{
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
library::Library,
location::file_path_helper::file_path_for_object_validator,
location::file_path_helper::{file_path_for_object_validator, MaterializedPath},
prisma::{file_path, location},
sync,
};
@ -94,7 +94,11 @@ impl StatefulJob for ObjectValidatorJob {
// we can also compare old and new checksums here
// This if is just to make sure, we already queried objects where integrity_checksum is null
if file_path.integrity_checksum.is_none() {
let checksum = file_checksum(data.root_path.join(&file_path.materialized_path)).await?;
let checksum = file_checksum(data.root_path.join(&MaterializedPath::from((
file_path.location.id,
&file_path.materialized_path,
))))
.await?;
sync.write_op(
db,

View file

@ -1,7 +1,10 @@
use crate::prisma::*;
use sd_sync::*;
use serde_json::{from_value, json, to_vec, Value};
use std::{collections::HashMap, sync::Arc};
use sd_sync::*;
use serde_json::{from_value, json, to_vec, Value};
use tokio::sync::broadcast::{self, Receiver, Sender};
use uhlc::{HLCBuilder, HLC, NTP64};
use uuid::Uuid;
@ -220,6 +223,8 @@ impl SyncManager {
}),
)
.unwrap(),
serde_json::from_value(data.remove("inode").unwrap()).unwrap(),
serde_json::from_value(data.remove("device").unwrap()).unwrap(),
data.into_iter()
.flat_map(|(k, v)| file_path::SetParam::deserialize(&k, v))
.collect(),

View file

@ -3,28 +3,27 @@ use std::ffi::CString;
use std::path::Path;
pub(crate) fn from_path(path: impl AsRef<Path>) -> Result<CString, ThumbnailerError> {
let path = path.as_ref();
#[cfg(unix)]
{
use std::os::unix::ffi::OsStrExt;
CString::new(path.as_ref().as_os_str().as_bytes())
.map_err(|_| ThumbnailerError::PathConversion(path.as_ref().to_path_buf()))
CString::new(path.as_os_str().as_bytes())
.map_err(|_| ThumbnailerError::PathConversion(path.to_path_buf()))
}
#[cfg(windows)]
{
use std::os::windows::ffi::OsStrExt;
CString::from_vec_with_nul(
path.as_ref()
.as_os_str()
path.as_os_str()
.encode_wide()
.chain(Some(0))
.map(|b| {
.flat_map(|b| {
let b = b.to_ne_bytes();
b.get(0).map(|s| *s).into_iter().chain(b.get(1).map(|s| *s))
b.first().copied().into_iter().chain(b.get(1).copied())
})
.flatten()
.collect::<Vec<u8>>(),
)
.map_err(|_| ThumbnailerError::PathConversion(path.as_ref().to_path_buf()))
.map_err(|_| ThumbnailerError::PathConversion(path.to_path_buf()))
}
}

View file

@ -4,7 +4,7 @@
export type Procedures = {
queries:
{ key: "buildInfo", input: never, result: BuildInfo } |
{ key: "files.get", input: LibraryArgs<GetArgs>, result: { id: number, pub_id: number[], name: string | null, extension: string | null, kind: number, size_in_bytes: string, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, date_modified: string, date_indexed: string, file_paths: FilePath[], media_data: MediaData | null } | null } |
{ key: "files.get", input: LibraryArgs<GetArgs>, result: { id: number, pub_id: number[], kind: number, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, file_paths: FilePath[], media_data: MediaData | null } | null } |
{ key: "jobs.getHistory", input: LibraryArgs<null>, result: JobReport[] } |
{ key: "jobs.getRunning", input: LibraryArgs<null>, result: JobReport[] } |
{ key: "jobs.isRunning", input: LibraryArgs<null>, result: boolean } |
@ -79,7 +79,7 @@ export type Procedures = {
subscriptions:
{ key: "invalidateQuery", input: never, result: InvalidateOperationEvent } |
{ key: "jobs.newThumbnail", input: LibraryArgs<null>, result: string } |
{ key: "locations.online", input: never, result: number[][] } |
{ key: "locations.online", input: never, result: string[] } |
{ key: "p2p.events", input: never, result: P2PEvent } |
{ key: "sync.newMessage", input: LibraryArgs<null>, result: CRDTOperation }
};
@ -134,7 +134,7 @@ export type FileEncryptorJobInit = { location_id: number, path_id: number, key_u
export type FileEraserJobInit = { location_id: number, path_id: number, passes: string }
export type FilePath = { id: number, is_dir: boolean, cas_id: string | null, integrity_checksum: string | null, location_id: number, materialized_path: string, name: string, extension: string, object_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string }
export type FilePath = { id: number, is_dir: boolean, cas_id: string | null, integrity_checksum: string | null, location_id: number, materialized_path: string, name: string, extension: string, size_in_bytes: string, inode: number[], device: number[], object_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string }
export type GenerateThumbsForLocationArgs = { id: number, path: string }
@ -224,7 +224,7 @@ export type NodeState = (({ version: string | null }) & { id: string, name: stri
*/
export type Nonce = { XChaCha20Poly1305: number[] } | { Aes256Gcm: number[] }
export type Object = { id: number, pub_id: number[], name: string | null, extension: string | null, kind: number, size_in_bytes: string, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, date_modified: string, date_indexed: string }
export type Object = { id: number, pub_id: number[], kind: number, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string }
export type ObjectValidatorArgs = { id: number, path: string }
@ -316,8 +316,8 @@ export type UnlockKeyManagerArgs = { password: string, secret_key: string }
export type Volume = { name: string, mount_point: string, total_capacity: string, available_capacity: string, is_removable: boolean, disk_type: string | null, file_system: string | null, is_root_filesystem: boolean }
export type file_path_with_object = { id: number, is_dir: boolean, cas_id: string | null, integrity_checksum: string | null, location_id: number, materialized_path: string, name: string, extension: string, object_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string, object: Object | null }
export type file_path_with_object = { id: number, is_dir: boolean, cas_id: string | null, integrity_checksum: string | null, location_id: number, materialized_path: string, name: string, extension: string, size_in_bytes: string, inode: number[], device: number[], object_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string, object: Object | null }
export type location_with_indexer_rules = { id: number, pub_id: number[], node_id: number, name: string, path: string, total_capacity: number | null, available_capacity: number | null, is_archived: boolean, generate_preview_media: boolean, sync_preview_media: boolean, hidden: boolean, date_created: string, indexer_rules: { indexer_rule: IndexerRule }[] }
export type object_with_file_paths = { id: number, pub_id: number[], name: string | null, extension: string | null, kind: number, size_in_bytes: string, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, date_modified: string, date_indexed: string, file_paths: FilePath[] }
export type object_with_file_paths = { id: number, pub_id: number[], kind: number, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, file_paths: FilePath[] }