mirror of
https://github.com/spacedriveapp/spacedrive
synced 2024-07-02 11:13:29 +00:00
More sync support for file paths + saved searches (#2067)
more sync support for file paths + saved searches
This commit is contained in:
parent
1469ef6ac6
commit
6f28d8ec28
|
@ -124,6 +124,8 @@ model Location {
|
|||
hidden Boolean?
|
||||
date_created DateTime?
|
||||
|
||||
/// @local
|
||||
// this is just a client side cache which is annoying but oh well (@brendan)
|
||||
instance_id Int?
|
||||
instance Instance? @relation(fields: [instance_id], references: [id], onDelete: SetNull)
|
||||
|
||||
|
@ -514,6 +516,7 @@ model Notification {
|
|||
@@map("notification")
|
||||
}
|
||||
|
||||
/// @shared(id: pub_id)
|
||||
model SavedSearch {
|
||||
id Int @id @default(autoincrement())
|
||||
pub_id Bytes @unique
|
||||
|
@ -532,6 +535,7 @@ model SavedSearch {
|
|||
@@map("saved_search")
|
||||
}
|
||||
|
||||
/// @local(id: id)
|
||||
model CloudCRDTOperation {
|
||||
id Bytes @id
|
||||
timestamp BigInt
|
||||
|
|
|
@ -21,8 +21,13 @@ use sd_file_path_helper::{
|
|||
};
|
||||
use sd_images::ConvertableExtension;
|
||||
use sd_media_metadata::MediaMetadata;
|
||||
use sd_prisma::prisma::{file_path, location, object};
|
||||
use sd_prisma::{
|
||||
prisma::{file_path, location, object},
|
||||
prisma_sync,
|
||||
};
|
||||
use sd_sync::OperationFactory;
|
||||
use sd_utils::{db::maybe_missing, error::FileIOError};
|
||||
use serde_json::json;
|
||||
|
||||
use std::{
|
||||
ffi::OsString,
|
||||
|
@ -177,15 +182,36 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
|||
|
||||
R.with2(library())
|
||||
.mutation(|(_, library), args: SetNoteArgs| async move {
|
||||
library
|
||||
.db
|
||||
let Library { db, sync, .. } = library.as_ref();
|
||||
|
||||
let object = db
|
||||
.object()
|
||||
.update(
|
||||
.find_unique(object::id::equals(args.id))
|
||||
.select(object::select!({ pub_id }))
|
||||
.exec()
|
||||
.await?
|
||||
.ok_or_else(|| {
|
||||
rspc::Error::new(
|
||||
rspc::ErrorCode::NotFound,
|
||||
"Object not found".to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
sync.write_op(
|
||||
&db,
|
||||
sync.shared_update(
|
||||
prisma_sync::object::SyncId {
|
||||
pub_id: object.pub_id,
|
||||
},
|
||||
object::note::NAME,
|
||||
json!(&args.note),
|
||||
),
|
||||
db.object().update(
|
||||
object::id::equals(args.id),
|
||||
vec![object::note::set(args.note)],
|
||||
)
|
||||
.exec()
|
||||
.await?;
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
|
||||
invalidate_query!(library, "search.paths");
|
||||
invalidate_query!(library, "search.objects");
|
||||
|
@ -202,15 +228,36 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
|||
|
||||
R.with2(library())
|
||||
.mutation(|(_, library), args: SetFavoriteArgs| async move {
|
||||
library
|
||||
.db
|
||||
let Library { sync, db, .. } = library.as_ref();
|
||||
|
||||
let object = db
|
||||
.object()
|
||||
.update(
|
||||
.find_unique(object::id::equals(args.id))
|
||||
.select(object::select!({ pub_id }))
|
||||
.exec()
|
||||
.await?
|
||||
.ok_or_else(|| {
|
||||
rspc::Error::new(
|
||||
rspc::ErrorCode::NotFound,
|
||||
"Object not found".to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
sync.write_op(
|
||||
&db,
|
||||
sync.shared_update(
|
||||
prisma_sync::object::SyncId {
|
||||
pub_id: object.pub_id,
|
||||
},
|
||||
object::favorite::NAME,
|
||||
json!(&args.favorite),
|
||||
),
|
||||
db.object().update(
|
||||
object::id::equals(args.id),
|
||||
vec![object::favorite::set(Some(args.favorite))],
|
||||
)
|
||||
.exec()
|
||||
.await?;
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
|
||||
invalidate_query!(library, "search.paths");
|
||||
invalidate_query!(library, "search.objects");
|
||||
|
@ -251,16 +298,43 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
|||
.procedure("updateAccessTime", {
|
||||
R.with2(library())
|
||||
.mutation(|(_, library), ids: Vec<i32>| async move {
|
||||
library
|
||||
.db
|
||||
let Library { sync, db, .. } = library.as_ref();
|
||||
|
||||
let objects = db
|
||||
.object()
|
||||
.update_many(
|
||||
vec![object::id::in_vec(ids)],
|
||||
vec![object::date_accessed::set(Some(Utc::now().into()))],
|
||||
)
|
||||
.find_many(vec![object::id::in_vec(ids)])
|
||||
.select(object::select!({ id pub_id }))
|
||||
.exec()
|
||||
.await?;
|
||||
|
||||
let date_accessed = Utc::now().into();
|
||||
|
||||
let (sync_params, db_params): (Vec<_>, Vec<_>) = objects
|
||||
.into_iter()
|
||||
.map(|d| {
|
||||
(
|
||||
sync.shared_update(
|
||||
prisma_sync::object::SyncId { pub_id: d.pub_id },
|
||||
object::date_accessed::NAME,
|
||||
json!(date_accessed),
|
||||
),
|
||||
d.id,
|
||||
)
|
||||
})
|
||||
.unzip();
|
||||
|
||||
sync.write_ops(
|
||||
&db,
|
||||
(
|
||||
sync_params,
|
||||
db.object().update_many(
|
||||
vec![object::id::in_vec(db_params)],
|
||||
vec![object::date_accessed::set(Some(date_accessed))],
|
||||
),
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
|
||||
invalidate_query!(library, "search.paths");
|
||||
invalidate_query!(library, "search.objects");
|
||||
Ok(())
|
||||
|
@ -269,16 +343,40 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
|||
.procedure("removeAccessTime", {
|
||||
R.with2(library())
|
||||
.mutation(|(_, library), object_ids: Vec<i32>| async move {
|
||||
library
|
||||
.db
|
||||
let Library { db, sync, .. } = library.as_ref();
|
||||
|
||||
let objects = db
|
||||
.object()
|
||||
.update_many(
|
||||
vec![object::id::in_vec(object_ids)],
|
||||
vec![object::date_accessed::set(None)],
|
||||
)
|
||||
.find_many(vec![object::id::in_vec(object_ids)])
|
||||
.select(object::select!({ id pub_id }))
|
||||
.exec()
|
||||
.await?;
|
||||
|
||||
let (sync_params, db_params): (Vec<_>, Vec<_>) = objects
|
||||
.into_iter()
|
||||
.map(|d| {
|
||||
(
|
||||
sync.shared_update(
|
||||
prisma_sync::object::SyncId { pub_id: d.pub_id },
|
||||
object::date_accessed::NAME,
|
||||
json!(null),
|
||||
),
|
||||
d.id,
|
||||
)
|
||||
})
|
||||
.unzip();
|
||||
sync.write_ops(
|
||||
&db,
|
||||
(
|
||||
sync_params,
|
||||
db.object().update_many(
|
||||
vec![object::id::in_vec(db_params)],
|
||||
vec![object::date_accessed::set(None)],
|
||||
),
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
|
||||
invalidate_query!(library, "search.objects");
|
||||
invalidate_query!(library, "search.paths");
|
||||
Ok(())
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
use crate::{api::utils::library, invalidate_query};
|
||||
use crate::{api::utils::library, invalidate_query, library::Library};
|
||||
|
||||
use sd_prisma::prisma::saved_search;
|
||||
use sd_prisma::{prisma::saved_search, prisma_sync};
|
||||
use sd_sync::OperationFactory;
|
||||
use sd_utils::chain_optional_iter;
|
||||
|
||||
use chrono::{DateTime, FixedOffset, Utc};
|
||||
use rspc::alpha::AlphaRouter;
|
||||
use serde::{de::IgnoredAny, Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use specta::Type;
|
||||
use tracing::error;
|
||||
use uuid::Uuid;
|
||||
|
@ -31,42 +33,76 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
|||
}
|
||||
|
||||
|(_, library), args: Args| async move {
|
||||
let Library { db, sync, .. } = library.as_ref();
|
||||
let pub_id = Uuid::new_v4().as_bytes().to_vec();
|
||||
let date_created: DateTime<FixedOffset> = Utc::now().into();
|
||||
|
||||
library
|
||||
.db
|
||||
.saved_search()
|
||||
.create(
|
||||
pub_id,
|
||||
chain_optional_iter(
|
||||
[
|
||||
saved_search::date_created::set(Some(date_created)),
|
||||
saved_search::name::set(Some(args.name)),
|
||||
],
|
||||
[
|
||||
args.filters
|
||||
.map(|s| {
|
||||
// https://github.com/serde-rs/json/issues/579
|
||||
// https://docs.rs/serde/latest/serde/de/struct.IgnoredAny.html
|
||||
if let Err(e) = serde_json::from_str::<IgnoredAny>(&s) {
|
||||
error!("failed to parse filters: {e:#?}");
|
||||
None
|
||||
} else {
|
||||
Some(s)
|
||||
}
|
||||
})
|
||||
.map(saved_search::filters::set),
|
||||
args.search.map(Some).map(saved_search::search::set),
|
||||
args.description
|
||||
.map(Some)
|
||||
.map(saved_search::description::set),
|
||||
args.icon.map(Some).map(saved_search::icon::set),
|
||||
],
|
||||
let (sync_params, db_params): (Vec<_>, Vec<_>) = chain_optional_iter(
|
||||
[
|
||||
(
|
||||
(saved_search::date_created::NAME, json!(date_created)),
|
||||
saved_search::date_created::set(Some(date_created)),
|
||||
),
|
||||
)
|
||||
.exec()
|
||||
.await?;
|
||||
(
|
||||
(saved_search::name::NAME, json!(&args.name)),
|
||||
saved_search::name::set(Some(args.name)),
|
||||
),
|
||||
],
|
||||
[
|
||||
args.filters
|
||||
.and_then(|s| {
|
||||
// https://github.com/serde-rs/json/issues/579
|
||||
// https://docs.rs/serde/latest/serde/de/struct.IgnoredAny.html
|
||||
|
||||
if let Err(e) = serde_json::from_str::<IgnoredAny>(&s) {
|
||||
error!("failed to parse filters: {e:#?}");
|
||||
None
|
||||
} else {
|
||||
Some(s)
|
||||
}
|
||||
})
|
||||
.map(|v| {
|
||||
(
|
||||
(saved_search::filters::NAME, json!(&v)),
|
||||
saved_search::filters::set(Some(v)),
|
||||
)
|
||||
}),
|
||||
args.search.map(|v| {
|
||||
(
|
||||
(saved_search::search::NAME, json!(&v)),
|
||||
saved_search::search::set(Some(v)),
|
||||
)
|
||||
}),
|
||||
args.description.map(|v| {
|
||||
(
|
||||
(saved_search::description::NAME, json!(&v)),
|
||||
saved_search::description::set(Some(v)),
|
||||
)
|
||||
}),
|
||||
args.icon.map(|v| {
|
||||
(
|
||||
(saved_search::icon::NAME, json!(&v)),
|
||||
saved_search::icon::set(Some(v)),
|
||||
)
|
||||
}),
|
||||
],
|
||||
)
|
||||
.into_iter()
|
||||
.unzip();
|
||||
|
||||
sync.write_ops(
|
||||
db,
|
||||
(
|
||||
sync.shared_create(
|
||||
prisma_sync::saved_search::SyncId {
|
||||
pub_id: pub_id.clone(),
|
||||
},
|
||||
sync_params,
|
||||
),
|
||||
db.saved_search().create(pub_id, db_params),
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
|
||||
invalidate_query!(library, "search.saved.list");
|
||||
|
||||
|
@ -107,15 +143,81 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
|||
});
|
||||
|
||||
|(_, library), (id, args): (saved_search::id::Type, Args)| async move {
|
||||
let mut params = args.to_params();
|
||||
params.push(saved_search::date_modified::set(Some(Utc::now().into())));
|
||||
let Library { db, sync, .. } = library.as_ref();
|
||||
let updated_at = Utc::now().into();
|
||||
|
||||
library
|
||||
.db
|
||||
let search = db
|
||||
.saved_search()
|
||||
.update_unchecked(saved_search::id::equals(id), params)
|
||||
.find_unique(saved_search::id::equals(id))
|
||||
.select(saved_search::select!({ pub_id }))
|
||||
.exec()
|
||||
.await?;
|
||||
.await?
|
||||
.ok_or_else(|| {
|
||||
rspc::Error::new(rspc::ErrorCode::NotFound, "search not found".into())
|
||||
})?;
|
||||
|
||||
let (sync_params, db_params): (Vec<_>, Vec<_>) = chain_optional_iter(
|
||||
[(
|
||||
(saved_search::date_modified::NAME, json!(updated_at)),
|
||||
saved_search::date_modified::set(Some(updated_at)),
|
||||
)],
|
||||
[
|
||||
args.name.map(|v| {
|
||||
(
|
||||
(saved_search::name::NAME, json!(&v)),
|
||||
saved_search::name::set(v),
|
||||
)
|
||||
}),
|
||||
args.description.map(|v| {
|
||||
(
|
||||
(saved_search::name::NAME, json!(&v)),
|
||||
saved_search::name::set(v),
|
||||
)
|
||||
}),
|
||||
args.icon.map(|v| {
|
||||
(
|
||||
(saved_search::icon::NAME, json!(&v)),
|
||||
saved_search::icon::set(v),
|
||||
)
|
||||
}),
|
||||
args.search.map(|v| {
|
||||
(
|
||||
(saved_search::search::NAME, json!(&v)),
|
||||
saved_search::search::set(v),
|
||||
)
|
||||
}),
|
||||
args.filters.map(|v| {
|
||||
(
|
||||
(saved_search::filters::NAME, json!(&v)),
|
||||
saved_search::filters::set(v),
|
||||
)
|
||||
}),
|
||||
],
|
||||
)
|
||||
.into_iter()
|
||||
.map(|((k, v), p)| {
|
||||
(
|
||||
sync.shared_update(
|
||||
prisma_sync::saved_search::SyncId {
|
||||
pub_id: search.pub_id.clone(),
|
||||
},
|
||||
k,
|
||||
v,
|
||||
),
|
||||
p,
|
||||
)
|
||||
})
|
||||
.unzip();
|
||||
|
||||
sync.write_ops(
|
||||
&db,
|
||||
(
|
||||
sync_params,
|
||||
db.saved_search()
|
||||
.update_unchecked(saved_search::id::equals(id), db_params),
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
|
||||
invalidate_query!(library, "search.saved.list");
|
||||
invalidate_query!(library, "search.saved.get");
|
||||
|
@ -127,12 +229,27 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
|||
.procedure("delete", {
|
||||
R.with2(library())
|
||||
.mutation(|(_, library), search_id: i32| async move {
|
||||
library
|
||||
.db
|
||||
let Library { db, sync, .. } = library.as_ref();
|
||||
|
||||
let search = db
|
||||
.saved_search()
|
||||
.delete(saved_search::id::equals(search_id))
|
||||
.find_unique(saved_search::id::equals(search_id))
|
||||
.select(saved_search::select!({ pub_id }))
|
||||
.exec()
|
||||
.await?;
|
||||
.await?
|
||||
.ok_or_else(|| {
|
||||
rspc::Error::new(rspc::ErrorCode::NotFound, "search not found".into())
|
||||
})?;
|
||||
|
||||
sync.write_op(
|
||||
&db,
|
||||
sync.shared_delete(prisma_sync::saved_search::SyncId {
|
||||
pub_id: search.pub_id,
|
||||
}),
|
||||
db.saved_search()
|
||||
.delete(saved_search::id::equals(search_id)),
|
||||
)
|
||||
.await?;
|
||||
|
||||
invalidate_query!(library, "search.saved.list");
|
||||
// disabled as it's messing with pre-delete navigation
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
use crate::{invalidate_query, library::Library, object::tag::TagCreateArgs};
|
||||
|
||||
use sd_cache::{CacheNode, Normalise, NormalisedResult, NormalisedResults, Reference};
|
||||
use sd_file_ext::kind::ObjectKind;
|
||||
use sd_prisma::{
|
||||
prisma::{file_path, object, tag, tag_on_object},
|
||||
prisma_sync,
|
||||
|
@ -165,6 +164,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
|||
.select(file_path::select!({
|
||||
id
|
||||
pub_id
|
||||
is_dir
|
||||
object: select { id pub_id }
|
||||
})),
|
||||
)
|
||||
|
@ -216,37 +216,40 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
|||
)
|
||||
.await?;
|
||||
} else {
|
||||
let (new_objects, _) = db
|
||||
._batch({
|
||||
let (left, right): (Vec<_>, Vec<_>) = file_paths
|
||||
.iter()
|
||||
.filter(|fp| fp.object.is_none())
|
||||
.map(|fp| {
|
||||
let id = uuid_to_bytes(Uuid::new_v4());
|
||||
let mut sync_params = vec![];
|
||||
|
||||
(
|
||||
db.object().create(
|
||||
id.clone(),
|
||||
vec![
|
||||
object::date_created::set(None),
|
||||
object::kind::set(Some(
|
||||
ObjectKind::Folder as i32,
|
||||
)),
|
||||
],
|
||||
),
|
||||
db.file_path().update(
|
||||
file_path::id::equals(fp.id),
|
||||
vec![file_path::object::connect(
|
||||
object::pub_id::equals(id),
|
||||
)],
|
||||
),
|
||||
)
|
||||
})
|
||||
.unzip();
|
||||
let db_params: (Vec<_>, Vec<_>) = file_paths
|
||||
.iter()
|
||||
.filter(|fp| fp.is_dir.unwrap_or_default() && fp.object.is_none())
|
||||
.map(|fp| {
|
||||
let id = uuid_to_bytes(Uuid::new_v4());
|
||||
|
||||
(left, right)
|
||||
sync_params.extend(sync.shared_create(
|
||||
prisma_sync::object::SyncId { pub_id: id.clone() },
|
||||
[],
|
||||
));
|
||||
|
||||
sync_params.push(sync.shared_update(
|
||||
prisma_sync::file_path::SyncId {
|
||||
pub_id: fp.pub_id.clone(),
|
||||
},
|
||||
file_path::object::NAME,
|
||||
json!(id),
|
||||
));
|
||||
|
||||
(
|
||||
db.object().create(id.clone(), vec![]),
|
||||
db.file_path().update(
|
||||
file_path::id::equals(fp.id),
|
||||
vec![file_path::object::connect(object::pub_id::equals(
|
||||
id,
|
||||
))],
|
||||
),
|
||||
)
|
||||
})
|
||||
.await?;
|
||||
.unzip();
|
||||
|
||||
let (new_objects, _) = sync.write_ops(db, (sync_params, db_params)).await?;
|
||||
|
||||
let (sync_ops, db_creates) = objects
|
||||
.into_iter()
|
||||
|
|
|
@ -225,9 +225,9 @@ impl LibraryConfig {
|
|||
Some(size.to_be_bytes().to_vec())
|
||||
} else {
|
||||
error!(
|
||||
"File path <id='{}'> had invalid size: '{}'",
|
||||
path.id, size_in_bytes
|
||||
);
|
||||
"File path <id='{}'> had invalid size: '{}'",
|
||||
path.id, size_in_bytes
|
||||
);
|
||||
None
|
||||
};
|
||||
|
||||
|
|
|
@ -164,6 +164,7 @@ impl StatefulJob for IndexerJobInit {
|
|||
let location_path = maybe_missing(&init.location.path, "location.path").map(Path::new)?;
|
||||
|
||||
let db = Arc::clone(&ctx.library.db);
|
||||
let sync = &ctx.library.sync;
|
||||
|
||||
let indexer_rules = init
|
||||
.location
|
||||
|
@ -235,7 +236,7 @@ impl StatefulJob for IndexerJobInit {
|
|||
|
||||
let db_delete_start = Instant::now();
|
||||
// TODO pass these uuids to sync system
|
||||
let removed_count = remove_non_existing_file_paths(to_remove, &db).await?;
|
||||
let removed_count = remove_non_existing_file_paths(to_remove, &db, sync).await?;
|
||||
let db_delete_time = db_delete_start.elapsed();
|
||||
|
||||
let total_new_paths = &mut 0;
|
||||
|
@ -381,6 +382,7 @@ impl StatefulJob for IndexerJobInit {
|
|||
maybe_missing(&init.location.path, "location.path").map(Path::new)?;
|
||||
|
||||
let db = Arc::clone(&ctx.library.db);
|
||||
let sync = &ctx.library.sync;
|
||||
|
||||
let scan_start = Instant::now();
|
||||
|
||||
|
@ -407,7 +409,8 @@ impl StatefulJob for IndexerJobInit {
|
|||
|
||||
let db_delete_time = Instant::now();
|
||||
// TODO pass these uuids to sync system
|
||||
new_metadata.removed_count = remove_non_existing_file_paths(to_remove, &db).await?;
|
||||
new_metadata.removed_count =
|
||||
remove_non_existing_file_paths(to_remove, &db, sync).await?;
|
||||
new_metadata.db_write_time = db_delete_time.elapsed();
|
||||
|
||||
let to_walk_count = to_walk.len();
|
||||
|
|
|
@ -304,15 +304,29 @@ fn iso_file_path_factory(
|
|||
async fn remove_non_existing_file_paths(
|
||||
to_remove: impl IntoIterator<Item = file_path_pub_and_cas_ids::Data>,
|
||||
db: &PrismaClient,
|
||||
sync: &sd_core_sync::Manager,
|
||||
) -> Result<u64, IndexerError> {
|
||||
db.file_path()
|
||||
.delete_many(vec![file_path::pub_id::in_vec(
|
||||
to_remove.into_iter().map(|data| data.pub_id).collect(),
|
||||
)])
|
||||
.exec()
|
||||
.await
|
||||
.map(|count| count as u64)
|
||||
.map_err(Into::into)
|
||||
let (sync_params, db_params): (Vec<_>, Vec<_>) = to_remove
|
||||
.into_iter()
|
||||
.map(|d| {
|
||||
(
|
||||
sync.shared_delete(prisma_sync::file_path::SyncId { pub_id: d.pub_id }),
|
||||
d.id,
|
||||
)
|
||||
})
|
||||
.unzip();
|
||||
|
||||
sync.write_ops(
|
||||
db,
|
||||
(
|
||||
sync_params,
|
||||
db.file_path()
|
||||
.delete_many(vec![file_path::id::in_vec(db_params)]),
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
// TODO: Change this macro to a fn when we're able to return
|
||||
|
@ -422,6 +436,7 @@ macro_rules! to_remove_db_fetcher_fn {
|
|||
.into_iter()
|
||||
.filter(|file_path| !founds_ids.contains(&file_path.id))
|
||||
.map(|file_path| ::sd_file_path_helper::file_path_pub_and_cas_ids::Data {
|
||||
id: file_path.id,
|
||||
pub_id: file_path.pub_id,
|
||||
cas_id: file_path.cas_id,
|
||||
}),
|
||||
|
|
|
@ -46,6 +46,7 @@ pub async fn shallow(
|
|||
let location_path = maybe_missing(&location.path, "location.path").map(Path::new)?;
|
||||
|
||||
let db = library.db.clone();
|
||||
let sync = &library.sync;
|
||||
|
||||
let indexer_rules = location
|
||||
.indexer_rules
|
||||
|
@ -103,7 +104,7 @@ pub async fn shallow(
|
|||
errors.into_iter().for_each(|e| error!("{e}"));
|
||||
|
||||
// TODO pass these uuids to sync system
|
||||
remove_non_existing_file_paths(to_remove, &db).await?;
|
||||
remove_non_existing_file_paths(to_remove, &db, sync).await?;
|
||||
|
||||
let mut new_directories_to_scan = HashSet::new();
|
||||
|
||||
|
|
|
@ -741,7 +741,7 @@ pub(super) async fn rename(
|
|||
let location_path = extract_location_path(location_id, library).await?;
|
||||
let old_path = old_path.as_ref();
|
||||
let new_path = new_path.as_ref();
|
||||
let Library { db, .. } = library;
|
||||
let Library { db, sync, .. } = library;
|
||||
|
||||
let old_path_materialized_str =
|
||||
extract_normalized_materialized_path_str(location_id, &location_path, old_path)?;
|
||||
|
@ -784,8 +784,8 @@ pub(super) async fn rename(
|
|||
let old_parts = old.to_parts();
|
||||
// TODO: Fetch all file_paths that will be updated and dispatch sync events
|
||||
|
||||
let updated = library
|
||||
.db
|
||||
// This is NOT sync compatible! @brendan
|
||||
let updated = db
|
||||
._execute_raw(raw!(
|
||||
"UPDATE file_path \
|
||||
SET materialized_path = REPLACE(materialized_path, {}, {}) \
|
||||
|
@ -807,23 +807,56 @@ pub(super) async fn rename(
|
|||
|
||||
let is_hidden = path_is_hidden(new_path, &new_path_metadata);
|
||||
|
||||
library
|
||||
.db
|
||||
.file_path()
|
||||
.update(
|
||||
file_path::pub_id::equals(file_path.pub_id),
|
||||
vec![
|
||||
file_path::materialized_path::set(Some(new_path_materialized_str)),
|
||||
file_path::name::set(Some(new_parts.name.to_string())),
|
||||
file_path::extension::set(Some(new_parts.extension.to_string())),
|
||||
file_path::date_modified::set(Some(
|
||||
DateTime::<Utc>::from(new_path_metadata.modified_or_now()).into(),
|
||||
)),
|
||||
file_path::hidden::set(Some(is_hidden)),
|
||||
],
|
||||
)
|
||||
.exec()
|
||||
.await?;
|
||||
let date_modified = DateTime::<Utc>::from(new_path_metadata.modified_or_now()).into();
|
||||
|
||||
let (sync_params, db_params): (Vec<_>, Vec<_>) = [
|
||||
(
|
||||
(
|
||||
file_path::materialized_path::NAME,
|
||||
json!(new_path_materialized_str),
|
||||
),
|
||||
file_path::materialized_path::set(Some(new_path_materialized_str)),
|
||||
),
|
||||
(
|
||||
(file_path::name::NAME, json!(new_parts.name)),
|
||||
file_path::name::set(Some(new_parts.name.to_string())),
|
||||
),
|
||||
(
|
||||
(file_path::extension::NAME, json!(new_parts.extension)),
|
||||
file_path::extension::set(Some(new_parts.extension.to_string())),
|
||||
),
|
||||
(
|
||||
(file_path::date_modified::NAME, json!(&date_modified)),
|
||||
file_path::date_modified::set(Some(date_modified)),
|
||||
),
|
||||
(
|
||||
(file_path::hidden::NAME, json!(is_hidden)),
|
||||
file_path::hidden::set(Some(is_hidden)),
|
||||
),
|
||||
]
|
||||
.into_iter()
|
||||
.unzip();
|
||||
|
||||
sync.write_ops(
|
||||
db,
|
||||
(
|
||||
sync_params
|
||||
.into_iter()
|
||||
.map(|(k, v)| {
|
||||
sync.shared_update(
|
||||
prisma_sync::file_path::SyncId {
|
||||
pub_id: file_path.pub_id.clone(),
|
||||
},
|
||||
k,
|
||||
v,
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
db.file_path()
|
||||
.update(file_path::pub_id::equals(file_path.pub_id), db_params),
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
|
||||
invalidate_query!(library, "search.paths");
|
||||
invalidate_query!(library, "search.objects");
|
||||
|
@ -870,7 +903,7 @@ pub(super) async fn remove_by_file_path(
|
|||
todo!("file has changed in some way, re-identify it")
|
||||
}
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => {
|
||||
let db = &library.db;
|
||||
let Library { sync, db, .. } = library;
|
||||
|
||||
let is_dir = maybe_missing(file_path.is_dir, "file_path.is_dir")?;
|
||||
|
||||
|
@ -883,10 +916,14 @@ pub(super) async fn remove_by_file_path(
|
|||
)
|
||||
.await?;
|
||||
} else {
|
||||
db.file_path()
|
||||
.delete(file_path::pub_id::equals(file_path.pub_id.clone()))
|
||||
.exec()
|
||||
.await?;
|
||||
sync.write_op(
|
||||
&db,
|
||||
sync.shared_delete(prisma_sync::file_path::SyncId {
|
||||
pub_id: file_path.pub_id.clone(),
|
||||
}),
|
||||
db.file_path().delete(file_path::id::equals(file_path.id)),
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(object_id) = file_path.object_id {
|
||||
db.object()
|
||||
|
|
|
@ -741,6 +741,8 @@ pub async fn delete_location(
|
|||
library: &Arc<Library>,
|
||||
location_id: location::id::Type,
|
||||
) -> Result<(), LocationError> {
|
||||
let Library { db, sync, .. } = library.as_ref();
|
||||
|
||||
let start = Instant::now();
|
||||
node.locations.remove(location_id, library.clone()).await?;
|
||||
debug!(
|
||||
|
@ -808,12 +810,14 @@ pub async fn delete_location(
|
|||
|
||||
let start = Instant::now();
|
||||
|
||||
library
|
||||
.db
|
||||
.location()
|
||||
.delete(location::id::equals(location_id))
|
||||
.exec()
|
||||
.await?;
|
||||
sync.write_op(
|
||||
db,
|
||||
sync.shared_delete(prisma_sync::location::SyncId {
|
||||
pub_id: location.pub_id,
|
||||
}),
|
||||
db.location().delete(location::id::equals(location_id)),
|
||||
)
|
||||
.await?;
|
||||
|
||||
debug!(
|
||||
"Elapsed time to delete location from db: {:?}",
|
||||
|
@ -836,6 +840,8 @@ pub async fn delete_directory(
|
|||
) -> Result<(), QueryError> {
|
||||
let Library { db, .. } = library;
|
||||
|
||||
// This is NOT sync-compatible!
|
||||
// Sync requires having sync ids available.
|
||||
let children_params = sd_utils::chain_optional_iter(
|
||||
[file_path::location_id::equals(Some(location_id))],
|
||||
[parent_iso_file_path.and_then(|parent| {
|
||||
|
@ -1061,30 +1067,60 @@ pub async fn create_file_path(
|
|||
location_id,
|
||||
))?;
|
||||
|
||||
let params = {
|
||||
let (sync_params, db_params): (Vec<_>, Vec<_>) = {
|
||||
use file_path::*;
|
||||
|
||||
vec![
|
||||
[
|
||||
(
|
||||
location::NAME,
|
||||
json!(prisma_sync::location::SyncId {
|
||||
pub_id: location.pub_id
|
||||
}),
|
||||
(
|
||||
location::NAME,
|
||||
json!(prisma_sync::location::SyncId {
|
||||
pub_id: location.pub_id
|
||||
}),
|
||||
),
|
||||
location::connect(prisma::location::id::equals(location.id)),
|
||||
),
|
||||
(cas_id::NAME, json!(cas_id)),
|
||||
(materialized_path::NAME, json!(materialized_path)),
|
||||
(name::NAME, json!(name)),
|
||||
(extension::NAME, json!(extension)),
|
||||
((cas_id::NAME, json!(cas_id)), cas_id::set(cas_id)),
|
||||
(
|
||||
size_in_bytes_bytes::NAME,
|
||||
json!(metadata.size_in_bytes.to_be_bytes().to_vec()),
|
||||
(materialized_path::NAME, json!(materialized_path)),
|
||||
materialized_path::set(Some(materialized_path.into())),
|
||||
),
|
||||
((name::NAME, json!(name)), name::set(Some(name.into()))),
|
||||
(
|
||||
(extension::NAME, json!(extension)),
|
||||
extension::set(Some(extension.into())),
|
||||
),
|
||||
(
|
||||
(
|
||||
size_in_bytes_bytes::NAME,
|
||||
json!(metadata.size_in_bytes.to_be_bytes().to_vec()),
|
||||
),
|
||||
size_in_bytes_bytes::set(Some(metadata.size_in_bytes.to_be_bytes().to_vec())),
|
||||
),
|
||||
(
|
||||
(inode::NAME, json!(metadata.inode.to_le_bytes())),
|
||||
inode::set(Some(inode_to_db(metadata.inode))),
|
||||
),
|
||||
((is_dir::NAME, json!(is_dir)), is_dir::set(Some(is_dir))),
|
||||
(
|
||||
(date_created::NAME, json!(metadata.created_at)),
|
||||
date_created::set(Some(metadata.created_at.into())),
|
||||
),
|
||||
(
|
||||
(date_modified::NAME, json!(metadata.modified_at)),
|
||||
date_modified::set(Some(metadata.modified_at.into())),
|
||||
),
|
||||
(
|
||||
(date_indexed::NAME, json!(indexed_at)),
|
||||
date_indexed::set(Some(indexed_at.into())),
|
||||
),
|
||||
(
|
||||
(hidden::NAME, json!(metadata.hidden)),
|
||||
hidden::set(Some(metadata.hidden)),
|
||||
),
|
||||
(inode::NAME, json!(metadata.inode.to_le_bytes())),
|
||||
(is_dir::NAME, json!(is_dir)),
|
||||
(date_created::NAME, json!(metadata.created_at)),
|
||||
(date_modified::NAME, json!(metadata.modified_at)),
|
||||
(date_indexed::NAME, json!(indexed_at)),
|
||||
]
|
||||
.into_iter()
|
||||
.unzip()
|
||||
};
|
||||
|
||||
let pub_id = sd_utils::uuid_to_bytes(Uuid::new_v4());
|
||||
|
@ -1097,27 +1133,9 @@ pub async fn create_file_path(
|
|||
prisma_sync::file_path::SyncId {
|
||||
pub_id: pub_id.clone(),
|
||||
},
|
||||
params,
|
||||
sync_params,
|
||||
),
|
||||
db.file_path().create(pub_id, {
|
||||
use file_path::*;
|
||||
vec![
|
||||
location::connect(prisma::location::id::equals(location.id)),
|
||||
materialized_path::set(Some(materialized_path.into())),
|
||||
name::set(Some(name.into())),
|
||||
extension::set(Some(extension.into())),
|
||||
inode::set(Some(inode_to_db(metadata.inode))),
|
||||
cas_id::set(cas_id),
|
||||
is_dir::set(Some(is_dir)),
|
||||
size_in_bytes_bytes::set(Some(
|
||||
metadata.size_in_bytes.to_be_bytes().to_vec(),
|
||||
)),
|
||||
date_created::set(Some(metadata.created_at.into())),
|
||||
date_modified::set(Some(metadata.modified_at.into())),
|
||||
date_indexed::set(Some(indexed_at.into())),
|
||||
hidden::set(Some(metadata.hidden)),
|
||||
]
|
||||
}),
|
||||
db.file_path().create(pub_id, db_params),
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
|
|
|
@ -7,7 +7,11 @@ use crate::{
|
|||
location::get_location_path_from_location_id,
|
||||
};
|
||||
|
||||
use sd_prisma::prisma::{file_path, location};
|
||||
use sd_prisma::{
|
||||
prisma::{file_path, location},
|
||||
prisma_sync,
|
||||
};
|
||||
use sd_sync::OperationFactory;
|
||||
use sd_utils::{db::maybe_missing, error::FileIOError};
|
||||
|
||||
use std::hash::Hash;
|
||||
|
@ -70,6 +74,8 @@ impl StatefulJob for FileDeleterJobInit {
|
|||
// need to handle stuff such as querying prisma for all paths of a file, and deleting all of those if requested (with a checkbox in the ui)
|
||||
// maybe a files.countOccurances/and or files.getPath(location_id, path_id) to show how many of these files would be deleted (and where?)
|
||||
|
||||
let Library { db, sync, .. } = ctx.library.as_ref();
|
||||
|
||||
match if maybe_missing(step.file_path.is_dir, "file_path.is_dir")? {
|
||||
fs::remove_dir_all(&step.full_path).await
|
||||
} else {
|
||||
|
@ -81,12 +87,15 @@ impl StatefulJob for FileDeleterJobInit {
|
|||
"File not found in the file system, will remove from database: {}",
|
||||
step.full_path.display()
|
||||
);
|
||||
ctx.library
|
||||
.db
|
||||
.file_path()
|
||||
.delete(file_path::id::equals(step.file_path.id))
|
||||
.exec()
|
||||
.await?;
|
||||
sync.write_op(
|
||||
&db,
|
||||
sync.shared_delete(prisma_sync::file_path::SyncId {
|
||||
pub_id: step.file_path.pub_id.clone(),
|
||||
}),
|
||||
db.file_path()
|
||||
.delete(file_path::id::equals(step.file_path.id)),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(JobError::from(FileIOError::from((&step.full_path, e))));
|
||||
|
|
|
@ -22,7 +22,7 @@ pub use isolated_file_path_data::{
|
|||
};
|
||||
|
||||
// File Path selectables!
|
||||
file_path::select!(file_path_pub_and_cas_ids { pub_id cas_id });
|
||||
file_path::select!(file_path_pub_and_cas_ids { id pub_id cas_id });
|
||||
file_path::select!(file_path_just_pub_id_materialized_path {
|
||||
pub_id
|
||||
materialized_path
|
||||
|
|
Loading…
Reference in a new issue