More sync impls (#586)

tags and stuff
This commit is contained in:
Brendan Allan 2023-03-03 13:23:10 +08:00 committed by GitHub
parent fe215566d6
commit 420aa75da5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 213 additions and 96 deletions

View file

@ -155,7 +155,7 @@ model Object {
@@map("object")
}
/// @shared(id: [location, id])
/// @owned(id: [location, id])
model FilePath {
id Int
is_dir Boolean @default(false)

View file

@ -1,4 +1,5 @@
use crate::{
library::LibraryContext,
location::{
delete_location, fetch_location,
indexer::{indexer_job::indexer_job_location, rules::IndexerRuleCreateArgs},
@ -79,8 +80,9 @@ pub(crate) fn mount() -> impl RouterBuilderLike<Ctx> {
}
t(|_, mut args: LocationExplorerArgs, library| async move {
let location = library
.db
let LibraryContext { db, .. } = &library;
let location = db
.location()
.find_unique(location::id::equals(args.location_id))
.exec()
@ -93,8 +95,7 @@ pub(crate) fn mount() -> impl RouterBuilderLike<Ctx> {
args.path += "/";
}
let directory = library
.db
let directory = db
.file_path()
.find_first(vec![
file_path::location_id::equals(location.id),
@ -107,8 +108,7 @@ pub(crate) fn mount() -> impl RouterBuilderLike<Ctx> {
rspc::Error::new(ErrorCode::NotFound, "Directory not found".into())
})?;
let file_paths = library
.db
let file_paths = db
.file_path()
.find_many(vec![
file_path::location_id::equals(location.id),

View file

@ -1,6 +1,7 @@
use rspc::{ErrorCode, Type};
use serde::Deserialize;
use serde_json::json;
use tracing::info;
use uuid::Uuid;
@ -9,6 +10,7 @@ use crate::{
invalidate_query,
library::LibraryContext,
prisma::{object, tag, tag_on_object},
sync,
};
use super::{utils::LibraryRequest, RouterBuilder};
@ -138,17 +140,27 @@ pub(crate) fn mount() -> RouterBuilder {
}
t(|_, args: TagCreateArgs, library| async move {
let created_tag = library
.db
.tag()
.create(
Uuid::new_v4().as_bytes().to_vec(),
vec![
tag::name::set(Some(args.name)),
tag::color::set(Some(args.color)),
],
let LibraryContext { db, sync, .. } = &library;
let pub_id = Uuid::new_v4().as_bytes().to_vec();
let created_tag = sync
.write_op(
db,
sync.unique_shared_create(
sync::tag::SyncId {
pub_id: pub_id.clone(),
},
[("name", json!(args.name)), ("color", json!(args.color))],
),
db.tag().create(
pub_id,
vec![
tag::name::set(Some(args.name)),
tag::color::set(Some(args.color)),
],
),
)
.exec()
.await?;
invalidate_query!(library, "tags.list");
@ -199,15 +211,42 @@ pub(crate) fn mount() -> RouterBuilder {
}
t(|_, args: TagUpdateArgs, library| async move {
library
.db
let LibraryContext { sync, db, .. } = &library;
let tag = db
.tag()
.update(
tag::id::equals(args.id),
vec![tag::name::set(args.name), tag::color::set(args.color)],
)
.find_unique(tag::id::equals(args.id))
.select(tag::select!({ pub_id }))
.exec()
.await?;
.await?
.unwrap();
sync.write_ops(
db,
(
[
args.name.as_ref().map(|v| ("name", json!(v))),
args.color.as_ref().map(|v| ("color", json!(v))),
]
.into_iter()
.flatten()
.map(|(k, v)| {
sync.shared_update(
sync::tag::SyncId {
pub_id: tag.pub_id.clone(),
},
k,
v,
)
})
.collect(),
db.tag().update(
tag::id::equals(args.id),
vec![tag::name::set(args.name), tag::color::set(args.color)],
),
),
)
.await?;
invalidate_query!(library, "tags.list");

View file

@ -1,5 +1,6 @@
use crate::{
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
library::LibraryContext,
location::indexer::rules::RuleKind,
prisma::{file_path, location},
sync,
@ -217,7 +218,7 @@ impl StatefulJob for IndexerJob {
ctx: WorkerContext,
state: &mut JobState<Self>,
) -> Result<(), JobError> {
let db = &ctx.library_ctx.db;
let LibraryContext { sync, db, .. } = &ctx.library_ctx;
let location = &state.init.location;
@ -284,12 +285,10 @@ impl StatefulJob for IndexerJob {
})
.unzip();
let count = ctx
.library_ctx
.sync
let count = sync
.write_op(
db,
ctx.library_ctx.sync.owned_create_many(sync_stuff, true),
sync.owned_create_many(sync_stuff, true),
db.file_path().create_many(paths).skip_duplicates(),
)
.await?;

View file

@ -164,33 +164,51 @@ pub struct LocationUpdateArgs {
impl LocationUpdateArgs {
pub async fn update(self, ctx: &LibraryContext) -> Result<(), LocationError> {
let LibraryContext { sync, db, .. } = &ctx;
let location = fetch_location(ctx, self.id)
.include(location::include!({ indexer_rules }))
.exec()
.await?
.ok_or(LocationError::IdNotFound(self.id))?;
let params = [
let (sync_params, db_params): (Vec<_>, Vec<_>) = [
self.name
.clone()
.filter(|name| &location.name != name)
.map(location::name::set),
self.generate_preview_media
.map(location::generate_preview_media::set),
self.sync_preview_media
.map(location::sync_preview_media::set),
self.hidden.map(location::hidden::set),
.map(|v| (("name", json!(v)), location::name::set(v))),
self.generate_preview_media.map(|v| {
(
("generate_preview_media", json!(v)),
location::generate_preview_media::set(v),
)
}),
self.sync_preview_media.map(|v| {
(
("sync_preview_media", json!(v)),
location::sync_preview_media::set(v),
)
}),
self.hidden
.map(|v| (("hidden", json!(v)), location::hidden::set(v))),
]
.into_iter()
.flatten()
.collect::<Vec<_>>();
.unzip();
if !params.is_empty() {
ctx.db
.location()
.update(location::id::equals(self.id), params)
.exec()
.await?;
if !sync_params.is_empty() {
sync.write_op(
db,
sync.owned_update(
sync::location::SyncId {
pub_id: location.pub_id,
},
sync_params,
),
db.location()
.update(location::id::equals(self.id), db_params),
)
.await?;
if location.node_id == ctx.node_local_id {
if let Some(mut metadata) =
@ -301,26 +319,35 @@ pub async fn relink_location(
ctx: &LibraryContext,
location_path: impl AsRef<Path>,
) -> Result<(), LocationError> {
let LibraryContext { db, id, sync, .. } = &ctx;
let mut metadata = SpacedriveLocationMetadataFile::try_load(&location_path)
.await?
.ok_or_else(|| LocationError::MissingMetadataFile(location_path.as_ref().to_path_buf()))?;
metadata.relink(ctx.id, &location_path).await?;
metadata.relink(*id, &location_path).await?;
ctx.db
.location()
.update(
location::pub_id::equals(metadata.location_pub_id(ctx.id)?.as_ref().to_vec()),
vec![location::path::set(
location_path
.as_ref()
.to_str()
.expect("Found non-UTF-8 path")
.to_string(),
)],
)
.exec()
.await?;
let pub_id = metadata.location_pub_id(ctx.id)?.as_ref().to_vec();
let path = location_path
.as_ref()
.to_str()
.expect("Found non-UTF-8 path")
.to_string();
sync.write_op(
db,
sync.owned_update(
sync::location::SyncId {
pub_id: pub_id.clone(),
},
[("path", json!(path))],
),
db.location().update(
location::pub_id::equals(pub_id),
vec![location::path::set(path)],
),
)
.await?;
Ok(())
}

View file

@ -1,8 +1,10 @@
use crate::{
prisma::{file_path, location, node, object, owned_operation, shared_operation, PrismaClient},
prisma_sync,
prisma::{
file_path, location, node, object, owned_operation, shared_operation, tag, PrismaClient,
},
sync,
};
use prisma_client_rust::ModelTypes;
use sd_sync::*;
use futures::future::join_all;
@ -196,15 +198,15 @@ impl SyncManager {
}
pub async fn ingest_op(&self, op: CRDTOperation) -> prisma_client_rust::Result<()> {
let db = &self.db;
match op.typ {
CRDTOperationType::Owned(owned_op) => match owned_op.model.as_str() {
"FilePath" => {
file_path::Types::MODEL => {
for item in owned_op.items {
let id: prisma_sync::file_path::SyncId =
serde_json::from_value(item.id).unwrap();
let id: sync::file_path::SyncId = serde_json::from_value(item.id).unwrap();
let location = self
.db
let location = db
.location()
.find_unique(location::pub_id::equals(id.location.pub_id))
.select(location::select!({ id }))
@ -214,8 +216,7 @@ impl SyncManager {
match item.data {
OwnedOperationData::Create(mut data) => {
self.db
.file_path()
db.file_path()
.create(
id.id,
location::id::equals(location.id),
@ -244,21 +245,21 @@ impl SyncManager {
values,
skip_duplicates,
} => {
let location_ids =
values
.iter()
.map(|(id, _)| {
serde_json::from_value::<prisma_sync::file_path::SyncId>(id.clone())
.unwrap()
.location
.pub_id
})
.collect::<HashSet<_>>();
let location_ids = values
.iter()
.map(|(id, _)| {
serde_json::from_value::<sync::file_path::SyncId>(
id.clone(),
)
.unwrap()
.location
.pub_id
})
.collect::<HashSet<_>>();
let location_id_mappings =
join_all(location_ids.iter().map(|id| async move {
self.db
.location()
db.location()
.find_unique(location::pub_id::equals(id.clone()))
.exec()
.await
@ -270,11 +271,11 @@ impl SyncManager {
.flatten()
.collect::<HashMap<_, _>>();
let mut q = self.db.file_path().create_many(
let mut q = db.file_path().create_many(
values
.into_iter()
.map(|(id, mut data)| {
let id: prisma_sync::file_path::SyncId =
let id: sync::file_path::SyncId =
serde_json::from_value(id).unwrap();
file_path::create_unchecked(
@ -330,14 +331,13 @@ impl SyncManager {
}
}
}
"Location" => {
location::Types::MODEL => {
for item in owned_op.items {
let id: prisma_sync::location::SyncId = from_value(item.id).unwrap();
let id: sync::location::SyncId = from_value(item.id).unwrap();
match item.data {
OwnedOperationData::Create(mut data) => {
self.db
.location()
db.location()
.create(
id.pub_id,
serde_json::from_value(data.remove("name").unwrap())
@ -368,13 +368,12 @@ impl SyncManager {
_ => {}
},
CRDTOperationType::Shared(shared_op) => match shared_op.model.as_str() {
"Object" => {
let id: prisma_sync::object::SyncId = from_value(shared_op.record_id).unwrap();
object::Types::MODEL => {
let id: sync::object::SyncId = from_value(shared_op.record_id).unwrap();
match shared_op.data {
SharedOperationData::Create(_) => {
self.db
.object()
db.object()
.upsert(
object::pub_id::equals(id.pub_id.clone()),
(id.pub_id, vec![]),
@ -385,8 +384,7 @@ impl SyncManager {
.ok();
}
SharedOperationData::Update { field, value } => {
self.db
.object()
db.object()
.update(
object::pub_id::equals(id.pub_id),
vec![object::SetParam::deserialize(&field, value).unwrap()],
@ -397,6 +395,41 @@ impl SyncManager {
_ => todo!(),
}
}
tag::Types::MODEL => {
let sync::tag::SyncId { pub_id } = from_value(shared_op.record_id).unwrap();
match shared_op.data {
SharedOperationData::Create(create_data) => match create_data {
SharedOperationCreateData::Unique(create_data) => {
db.tag()
.create(
pub_id,
create_data
.into_iter()
.flat_map(|(field, value)| {
tag::SetParam::deserialize(&field, value)
})
.collect(),
)
.exec()
.await?;
}
_ => unreachable!(),
},
SharedOperationData::Update { field, value } => {
db.tag()
.update(
tag::pub_id::equals(pub_id),
vec![tag::SetParam::deserialize(&field, value).unwrap()],
)
.exec()
.await?;
}
SharedOperationData::Delete => {
db.tag().delete(tag::pub_id::equals(pub_id)).exec().await?;
}
}
}
_ => todo!(),
},
_ => {}
@ -441,7 +474,7 @@ impl SyncManager {
pub fn owned_create_many<
const SIZE: usize,
TSyncId: SyncId<ModelTypes = TModel>,
TModel: SyncType<Marker = SharedSyncType>,
TModel: SyncType<Marker = OwnedSyncType>,
>(
&self,
data: impl IntoIterator<Item = (TSyncId, [(&'static str, Value); SIZE])>,
@ -467,13 +500,12 @@ impl SyncManager {
}))
}
pub fn owned_update<
const SIZE: usize,
TSyncId: SyncId<ModelTypes = TModel>,
TModel: SyncType<Marker = SharedSyncType>,
TModel: SyncType<Marker = OwnedSyncType>,
>(
&self,
id: TSyncId,
values: [(&'static str, Value); SIZE],
values: impl IntoIterator<Item = (&'static str, Value)>,
) -> CRDTOperation {
self.new_op(CRDTOperationType::Owned(OwnedOperation {
model: TModel::MODEL.to_string(),
@ -502,6 +534,26 @@ impl SyncManager {
data: SharedOperationData::Create(SharedOperationCreateData::Atomic),
}))
}
pub fn unique_shared_create<
const SIZE: usize,
TSyncId: SyncId<ModelTypes = TModel>,
TModel: SyncType<Marker = SharedSyncType>,
>(
&self,
id: TSyncId,
values: [(&'static str, Value); SIZE],
) -> CRDTOperation {
self.new_op(CRDTOperationType::Shared(SharedOperation {
model: TModel::MODEL.to_string(),
record_id: json!(id),
data: SharedOperationData::Create(SharedOperationCreateData::Unique(
values
.into_iter()
.map(|(name, value)| (name.to_string(), value))
.collect(),
)),
}))
}
pub fn shared_update<
TSyncId: SyncId<ModelTypes = TModel>,
TModel: SyncType<Marker = SharedSyncType>,