Give FilePath a pub_id (#691)

* checkpoint - indexer parent ids need fixing

* use pub_id for file paths

* Removing LastFilePathIdManager and fixing some loose ends

---------

Co-authored-by: Ericson "Fogo" Soares <ericson.ds999@gmail.com>
This commit is contained in:
Brendan Allan 2023-04-13 12:58:23 +08:00 committed by GitHub
parent 1e05226efb
commit 3c494ea96e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 306 additions and 433 deletions

View file

@ -113,9 +113,11 @@ model Location {
@@map("location")
}
/// @shared(id: [location, id])
/// @shared(id: pub_id)
model FilePath {
id Int
id Int @id @default(autoincrement())
pub_id Bytes @unique
is_dir Boolean @default(false)
// content addressable storage id - blake3 sampled checksum
@ -134,7 +136,7 @@ model FilePath {
name String
extension String
size_in_bytes String @default("0")
size_in_bytes String @default("0")
inode Bytes // This is actually an unsigned 64 bit integer, but we don't have this type in SQLite
device Bytes // This is actually an unsigned 64 bit integer, but we don't have this type in SQLite
@ -153,23 +155,21 @@ model FilePath {
date_indexed DateTime @default(now())
// NOTE: this self relation for the file tree was causing SQLite to go to forever bed, disabling until workaround
// parent FilePath? @relation("directory_file_paths", fields: [parent_id], references: [id], onDelete: NoAction, onUpdate: NoAction)
// children FilePath[] @relation("directory_file_paths")
parent FilePath? @relation("directory_file_paths", fields: [parent_id], references: [id], onDelete: NoAction, onUpdate: NoAction)
children FilePath[] @relation("directory_file_paths")
key Key? @relation(fields: [key_id], references: [id])
@@id([location_id, id])
@@unique([location_id, materialized_path, name, extension])
@@unique([location_id, inode, device])
@@index([location_id])
@@map("file_path")
}
/// @shared(id: pub_id)
model Object {
id Int @id @default(autoincrement())
pub_id Bytes @unique
kind Int @default(0)
id Int @id @default(autoincrement())
pub_id Bytes @unique
kind Int @default(0)
key_id Int?
// handy ways to mark an object

View file

@ -29,7 +29,7 @@ use uuid::Uuid;
// This LRU cache allows us to avoid doing a DB lookup on every request.
// The main advantage of this LRU Cache is for video files. Video files are fetch in multiple chunks and the cache prevents a DB lookup on every chunk reducing the request time from 15-25ms to 1-10ms.
type MetadataCacheKey = (Uuid, i32, i32);
type MetadataCacheKey = (Uuid, i32);
type NameAndExtension = (PathBuf, String);
static FILE_METADATA_CACHE: Lazy<Cache<MetadataCacheKey, NameAndExtension>> =
Lazy::new(|| Cache::new(100));
@ -145,21 +145,14 @@ async fn handle_file(
HandleCustomUriError::BadRequest("Invalid number of parameters. Missing library_id!")
})?;
let location_id = path
.get(2)
.and_then(|id| id.parse::<i32>().ok())
.ok_or_else(|| {
HandleCustomUriError::BadRequest("Invalid number of parameters. Missing location_id!")
})?;
let file_path_id = path
.get(3)
.get(2)
.and_then(|id| id.parse::<i32>().ok())
.ok_or_else(|| {
HandleCustomUriError::BadRequest("Invalid number of parameters. Missing file_path_id!")
})?;
let lru_cache_key = (library_id, location_id, file_path_id);
let lru_cache_key = (library_id, file_path_id);
let (file_path_materialized_path, extension) =
if let Some(entry) = FILE_METADATA_CACHE.get(&lru_cache_key) {
@ -170,10 +163,11 @@ async fn handle_file(
.get_ctx(library_id)
.await
.ok_or_else(|| HandleCustomUriError::NotFound("library"))?;
let file_path = library
.db
.file_path()
.find_unique(file_path::location_id_id(location_id, file_path_id))
.find_unique(file_path::id::equals(file_path_id))
.include(file_path::include!({ location }))
.exec()
.await?
@ -181,7 +175,7 @@ async fn handle_file(
let lru_entry = (
Path::new(&file_path.location.path).join(&MaterializedPath::from((
location_id,
file_path.location.id,
&file_path.materialized_path,
))),
file_path.extension,

View file

@ -1,11 +1,6 @@
use crate::{
api::CoreEvent,
job::DynJob,
location::{file_path_helper::LastFilePathIdManager, LocationManager},
node::NodeConfigManager,
object::preview::THUMBNAIL_CACHE_DIR_NAME,
prisma::PrismaClient,
sync::SyncManager,
api::CoreEvent, job::DynJob, location::LocationManager, node::NodeConfigManager,
object::preview::THUMBNAIL_CACHE_DIR_NAME, prisma::PrismaClient, sync::SyncManager,
NodeContext,
};
@ -34,8 +29,6 @@ pub struct Library {
pub sync: Arc<SyncManager>,
/// key manager that provides encryption keys to functions that require them
pub key_manager: Arc<KeyManager>,
/// last id by location keeps track of the last id by location for the library
pub last_file_path_id_manager: Arc<LastFilePathIdManager>,
/// node_local_id holds the local ID of the node which is running the library.
pub node_local_id: i32,
/// node_context holds the node context for the node which this library is running on.

View file

@ -1,6 +1,5 @@
use crate::{
invalidate_query,
location::file_path_helper::LastFilePathIdManager,
node::Platform,
prisma::{node, PrismaClient},
sync::{SyncManager, SyncMessage},
@ -358,7 +357,6 @@ impl LibraryManager {
key_manager,
sync: Arc::new(sync_manager),
db,
last_file_path_id_manager: Arc::new(LastFilePathIdManager::new()),
node_local_id: node_data.id,
node_context,
})

View file

@ -9,30 +9,33 @@ use std::{
};
use chrono::{DateTime, Utc};
use dashmap::{mapref::entry::Entry, DashMap};
use futures::future::try_join_all;
use prisma_client_rust::{Direction, QueryError};
use prisma_client_rust::QueryError;
use serde::{Deserialize, Serialize};
use serde_json::json;
use thiserror::Error;
use tokio::{fs, io};
use tracing::error;
use uuid::Uuid;
use super::LocationId;
// File Path selectables!
file_path::select!(file_path_just_id_materialized_path {
id
pub_id
materialized_path
});
file_path::select!(file_path_for_file_identifier {
id
pub_id
materialized_path
date_created
});
file_path::select!(file_path_just_object_id { object_id });
file_path::select!(file_path_for_object_validator {
id
pub_id
materialized_path
integrity_checksum
location: select {
@ -248,174 +251,74 @@ pub enum FilePathError {
IOError(#[from] io::Error),
}
#[derive(Debug)]
pub struct LastFilePathIdManager {
last_id_by_location: DashMap<LocationId, i32>,
}
#[cfg(feature = "location-watcher")]
pub async fn create_file_path(
Library { db, sync, .. }: &Library,
MaterializedPath {
materialized_path,
is_dir,
location_id,
name,
extension,
}: MaterializedPath<'_>,
parent_id: Option<(i32, Vec<u8>)>,
cas_id: Option<String>,
metadata: FilePathMetadata,
) -> Result<file_path::Data, FilePathError> {
use crate::sync;
impl Default for LastFilePathIdManager {
fn default() -> Self {
Self {
last_id_by_location: DashMap::with_capacity(4),
}
}
}
let pub_id = Uuid::new_v4().as_bytes().to_vec();
impl LastFilePathIdManager {
pub fn new() -> Self {
Default::default()
}
let params = [
("cas_id", json!(cas_id)),
("materialized_path", json!(materialized_path)),
("name", json!(name)),
("extension", json!(extension)),
("inode", json!(metadata.inode.to_le_bytes())),
("device", json!(metadata.device.to_le_bytes())),
("is_dir", json!(is_dir)),
("date_created", json!(metadata.created_at)),
("date_modified", json!(metadata.modified_at)),
("size_in_bytes", json!(metadata.size_in_bytes.to_string())),
]
.into_iter()
.map(Some)
.chain([parent_id
.clone()
.map(|(_, pub_id)| ("parent", json!(sync::file_path::SyncId { pub_id })))])
.flatten()
.collect::<Vec<_>>();
pub async fn sync(
&self,
location_id: LocationId,
db: &PrismaClient,
) -> Result<(), FilePathError> {
if let Some(mut id_ref) = self.last_id_by_location.get_mut(&location_id) {
*id_ref = Self::fetch_max_file_path_id(location_id, db).await?;
}
let created_path = sync
.write_op(
db,
sync.unique_shared_create(
sync::file_path::SyncId {
pub_id: pub_id.clone(),
},
params,
),
db.file_path().create(
pub_id,
location::id::equals(location_id),
materialized_path.into_owned(),
name.into_owned(),
extension.into_owned(),
metadata.inode.to_le_bytes().into(),
metadata.device.to_le_bytes().into(),
vec![
file_path::cas_id::set(cas_id),
file_path::parent_id::set(parent_id.map(|(id, _)| id)),
file_path::is_dir::set(is_dir),
file_path::size_in_bytes::set(metadata.size_in_bytes.to_string()),
file_path::date_created::set(metadata.created_at.into()),
file_path::date_modified::set(metadata.modified_at.into()),
],
),
)
.await?;
Ok(())
}
pub async fn increment(
&self,
location_id: LocationId,
by: i32,
db: &PrismaClient,
) -> Result<i32, FilePathError> {
Ok(match self.last_id_by_location.entry(location_id) {
Entry::Occupied(mut entry) => {
let first_free_id = *entry.get() + 1;
*entry.get_mut() += by + 1;
first_free_id
}
Entry::Vacant(entry) => {
// I wish I could use `or_try_insert_with` method instead of this crappy match,
// but we don't have async closures yet ):
let first_free_id = Self::fetch_max_file_path_id(location_id, db).await? + 1;
entry.insert(first_free_id + by);
first_free_id
}
})
}
async fn fetch_max_file_path_id(
location_id: LocationId,
db: &PrismaClient,
) -> Result<i32, FilePathError> {
Ok(db
.file_path()
.find_first(vec![file_path::location_id::equals(location_id)])
.order_by(file_path::id::order(Direction::Desc))
.select(file_path::select!({ id }))
.exec()
.await?
.map(|r| r.id)
.unwrap_or(0))
}
#[cfg(feature = "location-watcher")]
pub async fn create_file_path(
&self,
Library { db, sync, .. }: &Library,
MaterializedPath {
materialized_path,
is_dir,
location_id,
name,
extension,
}: MaterializedPath<'_>,
parent_id: Option<i32>,
cas_id: Option<String>,
metadata: FilePathMetadata,
) -> Result<file_path::Data, FilePathError> {
// Keeping a reference in that map for the entire duration of the function, so we keep it locked
use crate::sync;
let mut last_id_ref = match self.last_id_by_location.entry(location_id) {
Entry::Occupied(ocupied) => ocupied.into_ref(),
Entry::Vacant(vacant) => {
let id = Self::fetch_max_file_path_id(location_id, db).await?;
vacant.insert(id)
}
};
let location = db
.location()
.find_unique(location::id::equals(location_id))
.select(location::select!({ id pub_id }))
.exec()
.await?
.unwrap();
let next_id = *last_id_ref + 1;
let params = [
("cas_id", json!(cas_id)),
("materialized_path", json!(materialized_path)),
("name", json!(name)),
("extension", json!(extension)),
("size_in_bytes", json!(metadata.size_in_bytes.to_string())),
("inode", json!(metadata.inode.to_le_bytes())),
("device", json!(metadata.device.to_le_bytes())),
("is_dir", json!(is_dir)),
("date_created", json!(metadata.created_at)),
("date_modified", json!(metadata.modified_at)),
]
.into_iter()
.map(Some)
.chain([parent_id.map(|parent_id| {
(
"parent_id",
json!(sync::file_path::SyncId {
location: sync::location::SyncId {
pub_id: location.pub_id.clone()
},
id: parent_id
}),
)
})])
.flatten()
.collect::<Vec<_>>();
let created_path = sync
.write_op(
db,
sync.unique_shared_create(
sync::file_path::SyncId {
location: sync::location::SyncId {
pub_id: location.pub_id.clone(),
},
id: next_id,
},
params,
),
db.file_path().create(
next_id,
location::id::equals(location_id),
materialized_path.into_owned(),
name.into_owned(),
extension.into_owned(),
metadata.inode.to_le_bytes().into(),
metadata.device.to_le_bytes().into(),
vec![
file_path::cas_id::set(cas_id),
file_path::parent_id::set(parent_id),
file_path::is_dir::set(is_dir),
file_path::size_in_bytes::set(metadata.size_in_bytes.to_string()),
file_path::date_created::set(metadata.created_at.into()),
file_path::date_modified::set(metadata.modified_at.into()),
],
),
)
.await?;
*last_id_ref = next_id;
Ok(created_path)
}
Ok(created_path)
}
pub fn subtract_location_path(
@ -649,26 +552,33 @@ pub async fn ensure_sub_path_is_directory(
pub async fn retain_file_paths_in_location(
location_id: LocationId,
to_retain: Vec<i32>,
to_retain: Vec<Vec<u8>>,
maybe_parent_file_path: Option<file_path_just_id_materialized_path::Data>,
db: &PrismaClient,
) -> Result<i64, FilePathError> {
let mut to_delete_params = vec![
let to_delete_params = vec![
file_path::location_id::equals(location_id),
file_path::id::not_in_vec(to_retain),
];
if let Some(parent_file_path) = maybe_parent_file_path {
// If the parent_materialized_path is not the root path, we only delete file paths that start with the parent path
if parent_file_path.materialized_path != MAIN_SEPARATOR_STR {
to_delete_params.push(file_path::materialized_path::starts_with(
parent_file_path.materialized_path,
));
file_path::pub_id::not_in_vec(to_retain),
]
.into_iter()
.map(Some)
.chain([{
if let Some(parent_file_path) = maybe_parent_file_path {
// If the parent_materialized_path is not the root path, we only delete file paths that start with the parent path
Some(
if parent_file_path.materialized_path != MAIN_SEPARATOR_STR {
file_path::materialized_path::starts_with(parent_file_path.materialized_path)
} else {
// If the parent_materialized_path is the root path, we fetch children using the parent id
file_path::parent_id::equals(Some(parent_file_path.id))
},
)
} else {
// If the parent_materialized_path is the root path, we fetch children using the parent id
to_delete_params.push(file_path::parent_id::equals(Some(parent_file_path.id)));
None
}
}
}])
.flatten()
.collect();
db.file_path()
.delete_many(to_delete_params)

View file

@ -7,7 +7,7 @@ use crate::{
filter_file_paths_by_many_full_path_params, retain_file_paths_in_location,
MaterializedPath,
},
prisma::location,
prisma::{file_path, location},
};
use std::{collections::HashMap, path::Path};
@ -16,6 +16,7 @@ use chrono::Utc;
use itertools::Itertools;
use tokio::time::Instant;
use tracing::error;
use uuid::Uuid;
use super::{
execute_indexer_step, finalize_indexer,
@ -46,11 +47,7 @@ impl StatefulJob for IndexerJob {
/// Creates a vector of valid path buffers from a directory, chunked into batches of `BATCH_SIZE`.
async fn init(&self, ctx: WorkerContext, state: &mut JobState<Self>) -> Result<(), JobError> {
let Library {
last_file_path_id_manager,
db,
..
} = &ctx.library;
let Library { db, .. } = &ctx.library;
let location_id = state.init.location.id;
let location_path = Path::new(&state.init.location.path);
@ -90,7 +87,10 @@ impl StatefulJob for IndexerJob {
.expect("Sub path should already exist in the database");
// If we're operating with a sub_path, then we have to put its id on `dirs_ids` map
dirs_ids.insert(full_path.clone(), sub_path_file_path.id);
dirs_ids.insert(
full_path.clone(),
(sub_path_file_path.id, sub_path_file_path.pub_id.clone()),
);
(full_path, Some(sub_path_file_path))
} else {
@ -132,7 +132,11 @@ impl StatefulJob for IndexerJob {
.await
.map_err(IndexerError::from)?,
)
.select(file_path_just_id_materialized_path::select())
.select(file_path::select!({
id
pub_id
materialized_path
}))
.exec()
.await?
.into_iter()
@ -142,7 +146,7 @@ impl StatefulJob for IndexerJob {
location_id,
&file_path.materialized_path,
))),
file_path.id,
(file_path.id, file_path.pub_id),
)
}),
);
@ -150,19 +154,17 @@ impl StatefulJob for IndexerJob {
// Removing all other file paths that are not in the filesystem anymore
let removed_paths = retain_file_paths_in_location(
location_id,
dirs_ids.values().copied().collect(),
dirs_ids
.values()
.cloned()
.map(|(_, pub_id)| pub_id)
.collect(),
maybe_parent_file_path,
db,
)
.await
.map_err(IndexerError::from)?;
// Syncing the last file path id manager, as we potentially just removed a bunch of ids
last_file_path_id_manager
.sync(location_id, db)
.await
.map_err(IndexerError::from)?;
let mut new_paths = found_paths
.into_iter()
.filter_map(|entry| {
@ -181,13 +183,18 @@ impl StatefulJob for IndexerJob {
(!dirs_ids.contains_key(&entry.path)).then(|| {
IndexerJobStepEntry {
materialized_path,
file_id: 0, // To be set later
file_pub_id: Uuid::new_v4(),
parent_id: entry.path.parent().and_then(|parent_dir| {
/***************************************************************
* If we're dealing with a new path which its parent already *
* exist, we fetch its parent id from our `dirs_ids` map *
**************************************************************/
dirs_ids.get(parent_dir).copied()
dirs_ids
.get(parent_dir)
// SAFETY: We created this pub_id before, so it should be valid
.map(|(id, pub_id)| {
(*id, Uuid::from_slice(pub_id).unwrap())
})
}),
full_path: entry.path,
metadata: entry.metadata,
@ -200,28 +207,6 @@ impl StatefulJob for IndexerJob {
let total_paths = new_paths.len();
// grab the next id so we can increment in memory for batch inserting
let first_file_id = last_file_path_id_manager
.increment(location_id, total_paths as i32, db)
.await
.map_err(IndexerError::from)?;
new_paths
.iter_mut()
.zip(first_file_id..)
.for_each(|(entry, file_id)| {
// If the `parent_id` is still none here, is because the parent of this entry is also
// a new one in the DB
if entry.parent_id.is_none() {
entry.parent_id = entry
.full_path
.parent()
.and_then(|parent_dir| dirs_ids.get(parent_dir).copied());
}
entry.file_id = file_id;
dirs_ids.insert(entry.full_path.clone(), file_id);
});
state.data = Some(IndexerJobData {
db_write_start: Utc::now(),
scan_read_time: scan_start.elapsed(),

View file

@ -22,6 +22,7 @@ use serde_json::json;
use thiserror::Error;
use tokio::io;
use tracing::info;
use uuid::Uuid;
use super::{
file_path_helper::{FilePathError, FilePathMetadata, MaterializedPath},
@ -72,8 +73,8 @@ pub type IndexerJobStep = Vec<IndexerJobStepEntry>;
pub struct IndexerJobStepEntry {
full_path: PathBuf,
materialized_path: MaterializedPath<'static>,
file_id: i32,
parent_id: Option<i32>,
file_pub_id: Uuid,
parent_id: Option<(i32, Uuid)>,
metadata: FilePathMetadata,
}
@ -166,29 +167,37 @@ async fn execute_indexer_step(
(
sync.unique_shared_create(
sync::file_path::SyncId {
id: entry.file_id,
location: sync::location::SyncId {
pub_id: location.pub_id.clone(),
},
pub_id: entry.file_pub_id.as_bytes().to_vec(),
},
[
("materialized_path", json!(materialized_path.clone())),
("name", json!(name.clone())),
("is_dir", json!(is_dir)),
("extension", json!(extension.clone())),
("inode", json!(entry.metadata.inode.to_le_bytes())),
("device", json!(entry.metadata.device.to_le_bytes())),
(
"size_in_bytes",
json!(entry.metadata.size_in_bytes.to_string()),
),
("inode", json!(entry.metadata.inode.to_le_bytes())),
("device", json!(entry.metadata.device.to_le_bytes())),
("parent_id", json!(entry.parent_id)),
("date_created", json!(entry.metadata.created_at)),
("date_modified", json!(entry.metadata.modified_at)),
],
]
.into_iter()
.map(Some)
.chain([entry.parent_id.map(|(_, pub_id)| {
(
"parent",
json!(sync::file_path::SyncId {
pub_id: pub_id.as_bytes().to_vec()
}),
)
})])
.flatten()
.collect::<Vec<_>>(),
),
file_path::create_unchecked(
entry.file_id,
entry.file_pub_id.as_bytes().to_vec(),
location.id,
materialized_path.into_owned(),
name.into_owned(),
@ -198,10 +207,16 @@ async fn execute_indexer_step(
vec![
is_dir::set(is_dir),
size_in_bytes::set(entry.metadata.size_in_bytes.to_string()),
parent_id::set(entry.parent_id),
parent_id::set(entry.parent_id.map(|(id, _)| id)),
date_created::set(entry.metadata.created_at.into()),
date_modified::set(entry.metadata.modified_at.into()),
],
], // .into_iter()
// .map(Some)
// .chain([entry
// .parent_id
// .map(|id| parent::connect(pub_id::equals(id.as_bytes().to_vec())))])
// .flatten()
// .collect(),
),
)
})

View file

@ -21,6 +21,7 @@ use itertools::Itertools;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use tracing::error;
use uuid::Uuid;
use super::{
execute_indexer_step, finalize_indexer, location_with_indexer_rules,
@ -66,11 +67,7 @@ impl StatefulJob for ShallowIndexerJob {
/// Creates a vector of valid path buffers from a directory, chunked into batches of `BATCH_SIZE`.
async fn init(&self, ctx: WorkerContext, state: &mut JobState<Self>) -> Result<(), JobError> {
let Library {
last_file_path_id_manager,
db,
..
} = &ctx.library;
let Library { db, .. } = &ctx.library;
let location_id = state.init.location.id;
let location_path = Path::new(&state.init.location.path);
@ -157,13 +154,17 @@ impl StatefulJob for ShallowIndexerJob {
.exec()
.await?
.into_iter()
.map(|file_path| (file_path.materialized_path, file_path.id))
.map(|file_path| (file_path.materialized_path, file_path.pub_id))
.unzip::<_, _, HashSet<_>, Vec<_>>();
let parent_id = parent_file_path.id;
// SAFETY: We generate this uuid before, so it's valid
let parent_file_path_ids = (
parent_file_path.id,
Uuid::from_slice(&parent_file_path.pub_id).unwrap(),
);
// Adding our parent path id
to_retain.push(parent_id);
to_retain.push(parent_file_path.pub_id.clone());
// Removing all other file paths that are not in the filesystem anymore
let removed_paths =
@ -171,14 +172,8 @@ impl StatefulJob for ShallowIndexerJob {
.await
.map_err(IndexerError::from)?;
// Syncing the last file path id manager, as we potentially just removed a bunch of ids
last_file_path_id_manager
.sync(location_id, db)
.await
.map_err(IndexerError::from)?;
// Filter out paths that are already in the databases
let mut new_paths = found_paths
let new_paths = found_paths
.into_iter()
.filter_map(|entry| {
MaterializedPath::new(location_id, location_path, &entry.path, entry.is_dir)
@ -193,8 +188,8 @@ impl StatefulJob for ShallowIndexerJob {
.then_some(IndexerJobStepEntry {
full_path: entry.path,
materialized_path,
file_id: 0, // To be set later
parent_id: Some(parent_id),
file_pub_id: Uuid::new_v4(),
parent_id: Some(parent_file_path_ids),
metadata: entry.metadata,
})
},
@ -206,21 +201,6 @@ impl StatefulJob for ShallowIndexerJob {
let total_paths = new_paths.len();
// grab the next id so we can increment in memory for batch inserting
let first_file_id = last_file_path_id_manager
.increment(location_id, total_paths as i32, db)
.await
.map_err(IndexerError::from)?;
new_paths
.iter_mut()
.zip(first_file_id..)
.for_each(|(entry, file_id)| {
entry.file_id = file_id;
});
let total_paths = new_paths.len();
state.data = Some(IndexerJobData {
db_write_start: Utc::now(),
scan_read_time: scan_start.elapsed(),

View file

@ -4,9 +4,10 @@ use crate::{
location::{
delete_directory,
file_path_helper::{
extract_materialized_path, file_path_with_object, filter_existing_file_path_params,
get_parent_dir, get_parent_dir_id, loose_find_existing_file_path_params, FilePathError,
FilePathMetadata, MaterializedPath,
create_file_path, extract_materialized_path, file_path_with_object,
filter_existing_file_path_params, get_parent_dir, get_parent_dir_id,
loose_find_existing_file_path_params, FilePathError, FilePathMetadata,
MaterializedPath,
},
find_location, location_with_indexer_rules,
manager::LocationManagerError,
@ -106,22 +107,20 @@ pub(super) async fn create_dir(
return Ok(())
};
let created_path = library
.last_file_path_id_manager
.create_file_path(
library,
materialized_path,
Some(parent_directory.id),
None,
FilePathMetadata {
inode,
device,
size_in_bytes: metadata.len(),
created_at: metadata.created()?.into(),
modified_at: metadata.modified()?.into(),
},
)
.await?;
let created_path = create_file_path(
library,
materialized_path,
Some((parent_directory.id, parent_directory.pub_id)),
None,
FilePathMetadata {
inode,
device,
size_in_bytes: metadata.len(),
created_at: metadata.created()?.into(),
modified_at: metadata.modified()?.into(),
},
)
.await?;
info!("Created path: {}", created_path.materialized_path);
@ -180,22 +179,20 @@ pub(super) async fn create_file(
fs_metadata,
} = FileMetadata::new(&location_path, &materialized_path).await?;
let created_file = library
.last_file_path_id_manager
.create_file_path(
library,
materialized_path,
Some(parent_directory.id),
Some(cas_id.clone()),
FilePathMetadata {
inode,
device,
size_in_bytes: metadata.len(),
created_at: metadata.created()?.into(),
modified_at: metadata.modified()?.into(),
},
)
.await?;
let created_file = create_file_path(
library,
materialized_path,
Some((parent_directory.id, parent_directory.pub_id)),
Some(cas_id.clone()),
FilePathMetadata {
inode,
device,
size_in_bytes: metadata.len(),
created_at: metadata.created()?.into(),
modified_at: metadata.modified()?.into(),
},
)
.await?;
info!("Created path: {}", created_file.materialized_path);
@ -229,7 +226,7 @@ pub(super) async fn create_file(
db.file_path()
.update(
file_path::location_id_id(location_id, created_file.id),
file_path::pub_id::equals(created_file.pub_id),
vec![file_path::object_id::set(Some(object.id))],
)
.exec()
@ -407,10 +404,7 @@ async fn inner_update_file(
.map(|(field, value)| {
sync.shared_update(
sync::file_path::SyncId {
location: sync::location::SyncId {
pub_id: location.pub_id.clone(),
},
id: file_path.id,
pub_id: file_path.pub_id.clone(),
},
field,
value,
@ -418,7 +412,7 @@ async fn inner_update_file(
})
.collect(),
db.file_path().update(
file_path::location_id_id(location_id, file_path.id),
file_path::pub_id::equals(file_path.pub_id.clone()),
db_params,
),
),
@ -573,12 +567,10 @@ pub(super) async fn rename(
library
.db
.file_path()
.update(
file_path::location_id_id(file_path.location_id, file_path.id),
update_params,
)
.update(file_path::pub_id::equals(file_path.pub_id), update_params)
.exec()
.await?;
invalidate_query!(library, "locations.getExplorerData");
}
@ -631,7 +623,7 @@ pub(super) async fn remove_by_file_path(
library
.db
.file_path()
.delete(file_path::location_id_id(location_id, file_path.id))
.delete(file_path::pub_id::equals(file_path.pub_id.clone()))
.exec()
.await?;
@ -652,12 +644,6 @@ pub(super) async fn remove_by_file_path(
Err(e) => return Err(e.into()),
}
// If the file paths we just removed were the last ids in the DB, we decresed the last id from the id manager
library
.last_file_path_id_manager
.sync(location_id, &library.db)
.await?;
invalidate_query!(library, "locations.getExplorerData");
Ok(())

View file

@ -1,9 +1,12 @@
use crate::{
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
library::Library,
location::file_path_helper::{
ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
file_path_for_file_identifier, MaterializedPath,
location::{
file_path_helper::{
ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
file_path_for_file_identifier, MaterializedPath,
},
LocationId,
},
prisma::{file_path, location, PrismaClient},
};
@ -19,7 +22,7 @@ use tracing::info;
use super::{
finalize_file_identifier, process_identifier_file_paths, FileIdentifierJobError,
FileIdentifierReport, FilePathIdAndLocationIdCursor, CHUNK_SIZE,
FileIdentifierReport, FilePathIdCursor, CHUNK_SIZE,
};
pub const FILE_IDENTIFIER_JOB_NAME: &str = "file_identifier";
@ -48,7 +51,7 @@ impl Hash for FileIdentifierJobInit {
#[derive(Serialize, Deserialize)]
pub struct FileIdentifierJobState {
cursor: FilePathIdAndLocationIdCursor,
cursor: FilePathIdCursor,
report: FileIdentifierReport,
maybe_sub_materialized_path: Option<MaterializedPath<'static>>,
}
@ -97,10 +100,7 @@ impl StatefulJob for FileIdentifierJob {
total_orphan_paths: orphan_count,
..Default::default()
},
cursor: FilePathIdAndLocationIdCursor {
file_path_id: -1,
location_id,
},
cursor: FilePathIdCursor { file_path_id: -1 },
maybe_sub_materialized_path,
});
@ -162,8 +162,13 @@ impl StatefulJob for FileIdentifierJob {
let location = &state.init.location;
// get chunk of orphans to process
let file_paths =
get_orphan_file_paths(&ctx.library.db, cursor, maybe_sub_materialized_path).await?;
let file_paths = get_orphan_file_paths(
&ctx.library.db,
location.id,
cursor,
maybe_sub_materialized_path,
)
.await?;
process_identifier_file_paths(
self.name(),
@ -190,7 +195,7 @@ impl StatefulJob for FileIdentifierJob {
}
fn orphan_path_filters(
location_id: i32,
location_id: LocationId,
file_path_id: Option<i32>,
maybe_sub_materialized_path: &Option<MaterializedPath<'_>>,
) -> Vec<file_path::WhereParam> {
@ -215,7 +220,7 @@ fn orphan_path_filters(
async fn count_orphan_file_paths(
db: &PrismaClient,
location_id: i32,
location_id: LocationId,
maybe_sub_materialized_path: &Option<MaterializedPath<'_>>,
) -> Result<usize, prisma_client_rust::QueryError> {
db.file_path()
@ -231,7 +236,8 @@ async fn count_orphan_file_paths(
async fn get_orphan_file_paths(
db: &PrismaClient,
cursor: &FilePathIdAndLocationIdCursor,
location_id: LocationId,
cursor: &FilePathIdCursor,
maybe_sub_materialized_path: &Option<MaterializedPath<'_>>,
) -> Result<Vec<file_path_for_file_identifier::Data>, prisma_client_rust::QueryError> {
info!(
@ -240,7 +246,7 @@ async fn get_orphan_file_paths(
);
db.file_path()
.find_many(orphan_path_filters(
cursor.location_id,
location_id,
Some(cursor.file_path_id),
maybe_sub_materialized_path,
))

View file

@ -78,14 +78,13 @@ impl FileMetadata {
}
#[derive(Serialize, Deserialize, Debug)]
struct FilePathIdAndLocationIdCursor {
struct FilePathIdCursor {
file_path_id: i32,
location_id: i32,
}
impl From<&FilePathIdAndLocationIdCursor> for file_path::UniqueWhereParam {
fn from(cursor: &FilePathIdAndLocationIdCursor) -> Self {
file_path::location_id_id(cursor.location_id, cursor.file_path_id)
impl From<&FilePathIdCursor> for file_path::UniqueWhereParam {
fn from(cursor: &FilePathIdCursor) -> Self {
file_path::id::equals(cursor.file_path_id)
}
}
@ -128,20 +127,17 @@ async fn identifier_job_step(
db,
file_path_metas
.iter()
.map(|(id, (meta, _))| {
.map(|(id, (meta, file_path))| {
(
sync.shared_update(
sync::file_path::SyncId {
id: *id,
location: sync::location::SyncId {
pub_id: location.pub_id.clone(),
},
pub_id: file_path.pub_id.clone(),
},
"cas_id",
json!(&meta.cas_id),
),
db.file_path().update(
file_path::location_id_id(location.id, *id),
file_path::id::equals(*id),
vec![file_path::cas_id::set(Some(meta.cas_id.clone()))],
),
)
@ -179,7 +175,7 @@ async fn identifier_job_step(
db,
file_path_metas
.iter()
.flat_map(|(id, (meta, _))| {
.flat_map(|(_id, (meta, file_path))| {
existing_objects
.iter()
.find(|o| {
@ -187,14 +183,13 @@ async fn identifier_job_step(
.iter()
.any(|fp| fp.cas_id.as_ref() == Some(&meta.cas_id))
})
.map(|o| (*id, o))
.map(|o| (file_path, o))
})
.map(|(id, object)| {
.map(|(file_path, object)| {
let (crdt_op, db_op) = file_path_object_connect_ops(
id,
// SAFETY: This pub_id is generated by the uuid lib, but we have to store bytes in sqlite
Uuid::from_slice(&object.pub_id).unwrap(),
location,
file_path.id,
file_path.pub_id.clone(),
object.pub_id.clone(),
sync,
db,
);
@ -232,11 +227,10 @@ async fn identifier_job_step(
file_paths_requiring_new_object
.iter()
.map(|(id, (meta, fp))| {
let pub_id = Uuid::new_v4();
let pub_id_vec = pub_id.as_bytes().to_vec();
let object_pub_id = Uuid::new_v4().as_bytes().to_vec();
let sync_id = || sync::object::SyncId {
pub_id: pub_id_vec.clone(),
pub_id: object_pub_id.clone(),
};
let size = meta.fs_metadata.len().to_string();
@ -256,7 +250,7 @@ async fn identifier_job_step(
)
.collect::<Vec<_>>(),
object::create_unchecked(
pub_id_vec.clone(),
object_pub_id.clone(),
vec![
object::date_created::set(fp.date_created),
object::kind::set(kind),
@ -265,8 +259,13 @@ async fn identifier_job_step(
);
(object_creation_args, {
let (crdt_op, db_op) =
file_path_object_connect_ops(*id, pub_id, location, sync, db);
let (crdt_op, db_op) = file_path_object_connect_ops(
*id,
fp.pub_id.clone(),
object_pub_id,
sync,
db,
);
(crdt_op, db_op.select(file_path::select!({ id })))
})
@ -307,28 +306,30 @@ async fn identifier_job_step(
fn file_path_object_connect_ops<'db>(
file_path_id: i32,
object_id: Uuid,
location: &location::Data,
file_path_pub_id: Vec<u8>,
object_pub_id: Vec<u8>,
sync: &SyncManager,
db: &'db PrismaClient,
) -> (CRDTOperation, file_path::Update<'db>) {
info!("Connecting <FilePath id={file_path_id}> to <Object pub_id={object_id}'>");
info!(
"Connecting <FilePath id={file_path_id}> to <Object pub_id={}'>",
Uuid::from_slice(&object_pub_id).unwrap()
);
(
sync.shared_update(
sync::file_path::SyncId {
id: file_path_id,
location: sync::location::SyncId {
pub_id: location.pub_id.clone(),
},
pub_id: file_path_pub_id,
},
"object",
json!({ "pub_id": object_id }),
json!(sync::object::SyncId {
pub_id: object_pub_id.clone()
}),
),
db.file_path().update(
file_path::location_id_id(location.id, file_path_id),
file_path::id::equals(file_path_id),
vec![file_path::object::connect(object::pub_id::equals(
object_id.as_bytes().to_vec(),
object_pub_id,
))],
),
)
@ -339,7 +340,7 @@ async fn process_identifier_file_paths(
location: &location::Data,
file_paths: &[file_path_for_file_identifier::Data],
step_number: usize,
cursor: &mut FilePathIdAndLocationIdCursor,
cursor: &mut FilePathIdCursor,
report: &mut FileIdentifierReport,
ctx: WorkerContext,
) -> Result<(), JobError> {

View file

@ -1,9 +1,12 @@
use crate::{
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
library::Library,
location::file_path_helper::{
ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
file_path_for_file_identifier, get_existing_file_path_id, MaterializedPath,
location::{
file_path_helper::{
ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
file_path_for_file_identifier, get_existing_file_path_id, MaterializedPath,
},
LocationId,
},
prisma::{file_path, location, PrismaClient},
};
@ -19,7 +22,7 @@ use tracing::info;
use super::{
finalize_file_identifier, process_identifier_file_paths, FileIdentifierJobError,
FileIdentifierReport, FilePathIdAndLocationIdCursor, CHUNK_SIZE,
FileIdentifierReport, FilePathIdCursor, CHUNK_SIZE,
};
pub const SHALLOW_FILE_IDENTIFIER_JOB_NAME: &str = "shallow_file_identifier";
@ -45,7 +48,7 @@ impl Hash for ShallowFileIdentifierJobInit {
#[derive(Serialize, Deserialize)]
pub struct ShallowFileIdentifierJobState {
cursor: FilePathIdAndLocationIdCursor,
cursor: FilePathIdCursor,
report: FileIdentifierReport,
sub_path_id: i32,
}
@ -104,10 +107,7 @@ impl StatefulJob for ShallowFileIdentifierJob {
total_orphan_paths: orphan_count,
..Default::default()
},
cursor: FilePathIdAndLocationIdCursor {
file_path_id: -1,
location_id,
},
cursor: FilePathIdCursor { file_path_id: -1 },
sub_path_id,
});
@ -164,7 +164,8 @@ impl StatefulJob for ShallowFileIdentifierJob {
let location = &state.init.location;
// get chunk of orphans to process
let file_paths = get_orphan_file_paths(&ctx.library.db, cursor, *sub_path_id).await?;
let file_paths =
get_orphan_file_paths(&ctx.library.db, location.id, cursor, *sub_path_id).await?;
process_identifier_file_paths(
self.name(),
@ -191,7 +192,7 @@ impl StatefulJob for ShallowFileIdentifierJob {
}
fn orphan_path_filters(
location_id: i32,
location_id: LocationId,
file_path_id: Option<i32>,
sub_path_id: i32,
) -> Vec<file_path::WhereParam> {
@ -211,7 +212,7 @@ fn orphan_path_filters(
async fn count_orphan_file_paths(
db: &PrismaClient,
location_id: i32,
location_id: LocationId,
sub_path_id: i32,
) -> Result<usize, prisma_client_rust::QueryError> {
db.file_path()
@ -223,7 +224,8 @@ async fn count_orphan_file_paths(
async fn get_orphan_file_paths(
db: &PrismaClient,
cursor: &FilePathIdAndLocationIdCursor,
location_id: LocationId,
cursor: &FilePathIdCursor,
sub_path_id: i32,
) -> Result<Vec<file_path_for_file_identifier::Data>, prisma_client_rust::QueryError> {
info!(
@ -232,7 +234,7 @@ async fn get_orphan_file_paths(
);
db.file_path()
.find_many(orphan_path_filters(
cursor.location_id,
location_id,
Some(cursor.file_path_id),
sub_path_id,
))

View file

@ -65,7 +65,7 @@ pub async fn context_menu_fs_info(
) -> Result<FsInfo, JobError> {
let path_data = db
.file_path()
.find_unique(file_path::location_id_id(location_id, path_id))
.find_unique(file_path::id::equals(path_id))
.include(file_path_with_object::include())
.exec()
.await?

View file

@ -104,16 +104,13 @@ impl StatefulJob for ObjectValidatorJob {
db,
sync.shared_update(
sync::file_path::SyncId {
id: file_path.id,
location: sync::location::SyncId {
pub_id: file_path.location.pub_id.clone(),
},
pub_id: file_path.pub_id.clone(),
},
"integrity_checksum",
json!(&checksum),
),
db.file_path().update(
file_path::location_id_id(file_path.location.id, file_path.id),
file_path::id::equals(file_path.id),
vec![file_path::integrity_checksum::set(Some(checksum))],
),
)

View file

@ -1,4 +1,4 @@
use crate::prisma::*;
use crate::{prisma::*, sync};
use std::{collections::HashMap, sync::Arc};
@ -200,20 +200,26 @@ impl SyncManager {
match ModelSyncData::from_op(op.typ.clone()).unwrap() {
ModelSyncData::FilePath(id, shared_op) => {
let location = db
.location()
.find_unique(location::pub_id::equals(id.location.pub_id))
.select(location::select!({ id }))
.exec()
.await?
.unwrap();
// let location = db
// .location()
// .find_unique(location::pub_id::equals(id.location.pub_id))
// .select(location::select!({ id }))
// .exec()
// .await?
// .unwrap();
match shared_op {
SharedOperationData::Create(SharedOperationCreateData::Unique(mut data)) => {
db.file_path()
.create(
id.id,
location::id::equals(location.id),
id.pub_id,
location::pub_id::equals(
serde_json::from_value::<sync::location::SyncId>(
data.remove("location").unwrap(),
)
.unwrap()
.pub_id,
),
serde_json::from_value(data.remove("materialized_path").unwrap())
.unwrap(),
serde_json::from_value(data.remove("name").unwrap()).unwrap(),
@ -236,7 +242,7 @@ impl SyncManager {
self.db
.file_path()
.update(
file_path::location_id_id(location.id, id.id),
file_path::pub_id::equals(id.pub_id),
vec![file_path::SetParam::deserialize(&field, value).unwrap()],
)
.exec()

View file

@ -135,7 +135,7 @@ export type FileEncryptorJobInit = { location_id: number, path_id: number, key_u
export type FileEraserJobInit = { location_id: number, path_id: number, passes: string }
export type FilePath = { id: number, is_dir: boolean, cas_id: string | null, integrity_checksum: string | null, location_id: number, materialized_path: string, name: string, extension: string, size_in_bytes: string, inode: number[], device: number[], object_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string }
export type FilePath = { id: number, pub_id: number[], is_dir: boolean, cas_id: string | null, integrity_checksum: string | null, location_id: number, materialized_path: string, name: string, extension: string, size_in_bytes: string, inode: number[], device: number[], object_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string }
export type GenerateThumbsForLocationArgs = { id: number, path: string }
@ -319,7 +319,7 @@ export type UnlockKeyManagerArgs = { password: string, secret_key: string }
export type Volume = { name: string, mount_point: string, total_capacity: string, available_capacity: string, is_removable: boolean, disk_type: string | null, file_system: string | null, is_root_filesystem: boolean }
export type file_path_with_object = { id: number, is_dir: boolean, cas_id: string | null, integrity_checksum: string | null, location_id: number, materialized_path: string, name: string, extension: string, size_in_bytes: string, inode: number[], device: number[], object_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string, object: Object | null }
export type file_path_with_object = { id: number, pub_id: number[], is_dir: boolean, cas_id: string | null, integrity_checksum: string | null, location_id: number, materialized_path: string, name: string, extension: string, size_in_bytes: string, inode: number[], device: number[], object_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string, object: Object | null }
export type location_with_indexer_rules = { id: number, pub_id: number[], node_id: number, name: string, path: string, total_capacity: number | null, available_capacity: number | null, is_archived: boolean, generate_preview_media: boolean, sync_preview_media: boolean, hidden: boolean, date_created: string, indexer_rules: { indexer_rule: IndexerRule }[] }