Use codegen for ModelSyncType exec (#1098)

* use codegen for ModelSyncType exec

* folder client format

* hande create operation properly
This commit is contained in:
Brendan Allan 2023-07-15 11:22:04 +07:00 committed by GitHub
parent f9b32f5a4a
commit 0a1d721e58
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 768 additions and 547 deletions

1
.gitignore vendored
View file

@ -64,7 +64,6 @@ yalc.lock
todos.md
examples/*/*.lock
/target
prisma*.rs
playwright-report

28
Cargo.lock generated
View file

@ -5871,7 +5871,7 @@ dependencies = [
[[package]]
name = "prisma-client-rust"
version = "0.6.8"
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=e0af63224dcdb00a14c0aeade878894be8304cfc#e0af63224dcdb00a14c0aeade878894be8304cfc"
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=047c1102284a61ee400788baa7529d54ae745790#047c1102284a61ee400788baa7529d54ae745790"
dependencies = [
"base64 0.13.1",
"bigdecimal",
@ -5904,7 +5904,27 @@ dependencies = [
[[package]]
name = "prisma-client-rust-cli"
version = "0.6.8"
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=e0af63224dcdb00a14c0aeade878894be8304cfc#e0af63224dcdb00a14c0aeade878894be8304cfc"
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=047c1102284a61ee400788baa7529d54ae745790#047c1102284a61ee400788baa7529d54ae745790"
dependencies = [
"directories",
"flate2",
"http",
"prisma-client-rust-generator",
"proc-macro2",
"quote",
"regex",
"reqwest",
"serde",
"serde_json",
"serde_path_to_error",
"syn 1.0.109",
"thiserror",
]
[[package]]
name = "prisma-client-rust-generator"
version = "0.6.8"
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=047c1102284a61ee400788baa7529d54ae745790#047c1102284a61ee400788baa7529d54ae745790"
dependencies = [
"directories",
"flate2",
@ -5924,7 +5944,7 @@ dependencies = [
[[package]]
name = "prisma-client-rust-macros"
version = "0.6.8"
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=e0af63224dcdb00a14c0aeade878894be8304cfc#e0af63224dcdb00a14c0aeade878894be8304cfc"
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=047c1102284a61ee400788baa7529d54ae745790#047c1102284a61ee400788baa7529d54ae745790"
dependencies = [
"convert_case 0.6.0",
"proc-macro2",
@ -5936,7 +5956,7 @@ dependencies = [
[[package]]
name = "prisma-client-rust-sdk"
version = "0.6.8"
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=e0af63224dcdb00a14c0aeade878894be8304cfc#e0af63224dcdb00a14c0aeade878894be8304cfc"
source = "git+https://github.com/Brendonovich/prisma-client-rust?rev=047c1102284a61ee400788baa7529d54ae745790#047c1102284a61ee400788baa7529d54ae745790"
dependencies = [
"convert_case 0.5.0",
"dmmf",

View file

@ -18,19 +18,19 @@ edition = "2021"
repository = "https://github.com/spacedriveapp/spacedrive"
[workspace.dependencies]
prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "e0af63224dcdb00a14c0aeade878894be8304cfc", features = [
prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "047c1102284a61ee400788baa7529d54ae745790", features = [
"rspc",
"sqlite-create-many",
"migrations",
"sqlite",
], default-features = false }
prisma-client-rust-cli = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "e0af63224dcdb00a14c0aeade878894be8304cfc", features = [
prisma-client-rust-cli = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "047c1102284a61ee400788baa7529d54ae745790", features = [
"rspc",
"sqlite-create-many",
"migrations",
"sqlite",
], default-features = false }
prisma-client-rust-sdk = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "e0af63224dcdb00a14c0aeade878894be8304cfc", features = [
prisma-client-rust-sdk = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "047c1102284a61ee400788baa7529d54ae745790", features = [
"sqlite",
], default-features = false }

View file

@ -5,13 +5,15 @@ datasource db {
generator client {
provider = "cargo prisma"
output = "../../crates/prisma/src/prisma.rs"
output = "../../crates/prisma/src/prisma"
module_path = "sd_prisma::prisma"
client_format = "folder"
}
generator sync {
provider = "cargo prisma-sync"
output = "../../crates/prisma/src/prisma_sync.rs"
output = "../../crates/prisma/src/prisma_sync"
client_format = "folder"
}
//// Sync ////
@ -48,7 +50,7 @@ model Node {
@@map("node")
}
/// @local
/// @local(id: pub_id)
// represents a single `.db` file (SQLite DB) that is paired to the current library.
// A `LibraryInstance` is always owned by a single `Node` but it's possible for that node to change (or two to be owned by a single node).
model Instance {

View file

@ -10,7 +10,7 @@ use crate::{
library::Library,
object::tag::TagCreateArgs,
prisma::{tag, tag_on_object},
sync,
sync::{self, OperationFactory},
};
use super::{utils::library, Ctx, R};

View file

@ -185,6 +185,7 @@ pub async fn create_file_path(
use crate::{sync, util::db::uuid_to_bytes};
use sd_prisma::prisma;
use sd_sync::OperationFactory;
use serde_json::json;
use uuid::Uuid;
@ -225,30 +226,34 @@ pub async fn create_file_path(
let pub_id = uuid_to_bytes(Uuid::new_v4());
let created_path = sync
.write_op(
.write_ops(
db,
sync.unique_shared_create(
sync::file_path::SyncId {
pub_id: pub_id.clone(),
},
params,
(
sync.shared_create(
sync::file_path::SyncId {
pub_id: pub_id.clone(),
},
params,
),
db.file_path().create(pub_id, {
use file_path::*;
vec![
location::connect(prisma::location::id::equals(location.id)),
materialized_path::set(Some(materialized_path.into_owned())),
name::set(Some(name.into_owned())),
extension::set(Some(extension.into_owned())),
inode::set(Some(inode_to_db(metadata.inode))),
device::set(Some(device_to_db(metadata.device))),
cas_id::set(cas_id),
is_dir::set(Some(is_dir)),
size_in_bytes_bytes::set(Some(
metadata.size_in_bytes.to_be_bytes().to_vec(),
)),
date_created::set(Some(metadata.created_at.into())),
date_modified::set(Some(metadata.modified_at.into())),
]
}),
),
db.file_path().create(pub_id, {
use file_path::*;
vec![
location::connect(prisma::location::id::equals(location.id)),
materialized_path::set(Some(materialized_path.into_owned())),
name::set(Some(name.into_owned())),
extension::set(Some(extension.into_owned())),
inode::set(Some(inode_to_db(metadata.inode))),
device::set(Some(device_to_db(metadata.device))),
cas_id::set(cas_id),
is_dir::set(Some(is_dir)),
size_in_bytes_bytes::set(Some(metadata.size_in_bytes.to_be_bytes().to_vec())),
date_created::set(Some(metadata.created_at.into())),
date_modified::set(Some(metadata.modified_at.into())),
]
}),
)
.await?;

View file

@ -12,7 +12,7 @@ use std::path::Path;
use chrono::Utc;
use rspc::ErrorCode;
use sd_prisma::prisma_sync;
use sd_sync::*;
use serde::{Deserialize, Serialize};
use serde_json::json;
use thiserror::Error;
@ -109,7 +109,7 @@ async fn execute_indexer_save_step(
(
(
location::NAME,
json!(prisma_sync::location::SyncId {
json!(sync::location::SyncId {
pub_id: pub_id.clone()
}),
),
@ -159,7 +159,7 @@ async fn execute_indexer_save_step(
.unzip();
(
sync.unique_shared_create(
sync.shared_create(
sync::file_path::SyncId {
pub_id: uuid_to_bytes(entry.pub_id),
},
@ -174,7 +174,7 @@ async fn execute_indexer_save_step(
.write_ops(
db,
(
sync_stuff,
sync_stuff.into_iter().flatten().collect(),
db.file_path().create_many(paths).skip_duplicates(),
),
)

View file

@ -20,7 +20,7 @@ use crate::{
validation::hash::file_checksum,
},
prisma::{file_path, location, object},
sync,
sync::{self, OperationFactory},
util::{
db::{device_from_db, device_to_db, inode_from_db, inode_to_db, maybe_missing},
error::FileIOError,

View file

@ -21,6 +21,7 @@ use chrono::Utc;
use futures::future::TryFutureExt;
use normpath::PathExt;
use prisma_client_rust::{operator::and, or, QueryError};
use sd_sync::*;
use serde::Deserialize;
use serde_json::json;
use specta::Type;
@ -579,38 +580,41 @@ async fn create_location(
let date_created = Utc::now();
let location = sync
.write_op(
.write_ops(
db,
sync.unique_shared_create(
sync::location::SyncId {
pub_id: location_pub_id.as_bytes().to_vec(),
},
[
(location::name::NAME, json!(&name)),
(location::path::NAME, json!(&location_path)),
(location::date_created::NAME, json!(date_created)),
(
location::instance_id::NAME,
json!(sync::instance::SyncId {
id: library.config.instance_id,
}),
),
],
),
db.location()
.create(
location_pub_id.as_bytes().to_vec(),
vec![
location::name::set(Some(name.clone())),
location::path::set(Some(location_path)),
location::date_created::set(Some(date_created.into())),
location::instance_id::set(Some(library.config.instance_id)),
// location::instance::connect(instance::id::equals(
// library.config.instance_id.as_bytes().to_vec(),
// )),
(
sync.shared_create(
sync::location::SyncId {
pub_id: location_pub_id.as_bytes().to_vec(),
},
[
(location::name::NAME, json!(&name)),
(location::path::NAME, json!(&location_path)),
(location::date_created::NAME, json!(date_created)),
(
location::instance_id::NAME,
json!(sync::instance::SyncId {
pub_id: vec![],
// id: library.config.instance_id,
}),
),
],
)
.include(location_with_indexer_rules::include()),
),
db.location()
.create(
location_pub_id.as_bytes().to_vec(),
vec![
location::name::set(Some(name.clone())),
location::path::set(Some(location_path)),
location::date_created::set(Some(date_created.into())),
location::instance_id::set(Some(library.config.instance_id)),
// location::instance::connect(instance::id::equals(
// library.config.instance_id.as_bytes().to_vec(),
// )),
],
)
.include(location_with_indexer_rules::include()),
),
)
.await?;

View file

@ -6,8 +6,7 @@ use crate::{
},
object::{cas::generate_cas_id, object_for_file_identifier},
prisma::{file_path, location, object, PrismaClient},
sync,
sync::SyncManager,
sync::{self, CRDTOperation, OperationFactory, SyncManager},
util::{
db::{maybe_missing, uuid_to_bytes},
error::FileIOError,
@ -15,7 +14,6 @@ use crate::{
};
use sd_file_ext::{extensions::Extension, kind::ObjectKind};
use sd_sync::CRDTOperation;
use std::{
collections::{HashMap, HashSet},
@ -252,7 +250,7 @@ async fn identifier_job_step(
.unzip();
let object_creation_args = (
sync.unique_shared_create(sync_id(), sync_params),
sync.shared_create(sync_id(), sync_params),
object::create_unchecked(uuid_to_bytes(object_pub_id), db_params),
);
@ -274,7 +272,10 @@ async fn identifier_job_step(
.write_ops(db, {
let (sync, db_params): (Vec<_>, Vec<_>) = object_create_args.into_iter().unzip();
(sync, db.object().create_many(db_params))
(
sync.into_iter().flatten().collect(),
db.object().create_many(db_params),
)
})
.await
.unwrap_or_else(|e| {

View file

@ -1,6 +1,7 @@
pub mod seed;
use chrono::{DateTime, FixedOffset, Utc};
use sd_sync::*;
use serde::Deserialize;
use serde_json::json;
use specta::Type;
@ -23,25 +24,27 @@ impl TagCreateArgs {
let pub_id = Uuid::new_v4().as_bytes().to_vec();
let date_created: DateTime<FixedOffset> = Utc::now().into();
sync.write_op(
sync.write_ops(
db,
sync.unique_shared_create(
sync::tag::SyncId {
pub_id: pub_id.clone(),
},
[
(tag::name::NAME, json!(&self.name)),
(tag::color::NAME, json!(&self.color)),
(tag::date_created::NAME, json!(&date_created.to_rfc3339())),
],
),
db.tag().create(
pub_id,
vec![
tag::name::set(Some(self.name)),
tag::color::set(Some(self.color)),
tag::date_created::set(Some(date_created)),
],
(
sync.shared_create(
sync::tag::SyncId {
pub_id: pub_id.clone(),
},
[
(tag::name::NAME, json!(&self.name)),
(tag::color::NAME, json!(&self.color)),
(tag::date_created::NAME, json!(&date_created.to_rfc3339())),
],
),
db.tag().create(
pub_id,
vec![
tag::name::set(Some(self.name)),
tag::color::set(Some(self.color)),
tag::date_created::set(Some(date_created)),
],
),
),
)
.await

View file

@ -8,7 +8,7 @@ use crate::{
file_path_for_object_validator, IsolatedFilePathData,
},
prisma::{file_path, location},
sync,
sync::{self, OperationFactory},
util::{
db::{chain_optional_iter, maybe_missing},
error::FileIOError,

View file

@ -1,12 +1,11 @@
#![allow(clippy::unwrap_used, clippy::panic)] // TODO: Brendan remove this once you've got error handling here
use crate::prisma::*;
use sd_sync::*;
use std::{collections::HashMap, sync::Arc};
use sd_sync::*;
use serde_json::{json, to_vec, Value};
use serde_json::to_vec;
use tokio::sync::broadcast::{self, Receiver, Sender};
use uhlc::{HLCBuilder, HLC, NTP64};
use uuid::Uuid;
@ -55,7 +54,7 @@ impl SyncManager {
.filter_map(|op| match &op.typ {
CRDTOperationType::Shared(shared_op) => {
let kind = match &shared_op.data {
SharedOperationData::Create(_) => "c",
SharedOperationData::Create => "c",
SharedOperationData::Update { .. } => "u",
SharedOperationData::Delete => "d",
};
@ -101,7 +100,7 @@ impl SyncManager {
let ret = match &op.typ {
CRDTOperationType::Shared(shared_op) => {
let kind = match &shared_op.data {
SharedOperationData::Create(_) => "c",
SharedOperationData::Create => "c",
SharedOperationData::Update { .. } => "u",
SharedOperationData::Delete => "d",
};
@ -177,138 +176,14 @@ impl SyncManager {
let msg = SyncMessage::Ingested(op.clone());
match ModelSyncData::from_op(op.typ.clone()).unwrap() {
ModelSyncData::FilePath(id, shared_op) => match shared_op {
SharedOperationData::Create(data) => {
let data: Vec<_> = data
.into_iter()
.flat_map(|(k, v)| file_path::SetParam::deserialize(&k, v))
.collect();
db.file_path()
.upsert(
file_path::pub_id::equals(id.pub_id.clone()),
file_path::create(id.pub_id, data.clone()),
data,
)
.exec()
.await?;
}
SharedOperationData::Update { field, value } => {
let data = vec![file_path::SetParam::deserialize(&field, value).unwrap()];
db.file_path()
.upsert(
file_path::pub_id::equals(id.pub_id.clone()),
file_path::create(id.pub_id, data.clone()),
data,
)
.exec()
.await?;
}
_ => todo!(),
},
ModelSyncData::Location(id, shared_op) => match shared_op {
SharedOperationData::Create(data) => {
let data: Vec<_> = data
.into_iter()
.flat_map(|(k, v)| location::SetParam::deserialize(&k, v))
.collect();
db.location()
.upsert(
location::pub_id::equals(id.pub_id.clone()),
location::create(id.pub_id, data.clone()),
data,
)
.exec()
.await?;
}
SharedOperationData::Update { field, value } => {
let data = vec![location::SetParam::deserialize(&field, value).unwrap()];
db.location()
.upsert(
location::pub_id::equals(id.pub_id.clone()),
location::create(id.pub_id, data.clone()),
data,
)
.exec()
.await?;
}
_ => todo!(),
},
ModelSyncData::Object(id, shared_op) => match shared_op {
SharedOperationData::Create(data) => {
let data: Vec<_> = data
.into_iter()
.flat_map(|(k, v)| object::SetParam::deserialize(&k, v))
.collect();
db.object()
.upsert(
object::pub_id::equals(id.pub_id.clone()),
object::create(id.pub_id, vec![]),
data,
)
.exec()
.await?;
}
SharedOperationData::Update { field, value } => {
let data = vec![object::SetParam::deserialize(&field, value).unwrap()];
db.object()
.upsert(
object::pub_id::equals(id.pub_id.clone()),
object::create(id.pub_id, data.clone()),
data,
)
.exec()
.await?;
}
_ => todo!(),
},
ModelSyncData::Tag(id, shared_op) => match shared_op {
SharedOperationData::Create(data) => {
let data: Vec<_> = data
.into_iter()
.flat_map(|(field, value)| tag::SetParam::deserialize(&field, value))
.collect();
db.tag()
.upsert(
tag::pub_id::equals(id.pub_id.clone()),
tag::create(id.pub_id, data.clone()),
data,
)
.exec()
.await?;
}
SharedOperationData::Update { field, value } => {
let data = vec![tag::SetParam::deserialize(&field, value).unwrap()];
db.tag()
.upsert(
tag::pub_id::equals(id.pub_id.clone()),
tag::create(id.pub_id, data.clone()),
data,
)
.exec()
.await?;
}
SharedOperationData::Delete => {
db.tag()
.delete(tag::pub_id::equals(id.pub_id))
.exec()
.await?;
}
},
ModelSyncData::Preference(_, _) => todo!(),
}
ModelSyncData::from_op(op.typ.clone())
.unwrap()
.exec(db)
.await?;
if let CRDTOperationType::Shared(shared_op) = op.typ {
let kind = match &shared_op.data {
SharedOperationData::Create(_) => "c",
SharedOperationData::Create => "c",
SharedOperationData::Update { .. } => "u",
SharedOperationData::Delete => "d",
};
@ -332,53 +207,14 @@ impl SyncManager {
Ok(())
}
}
fn new_op(&self, typ: CRDTOperationType) -> CRDTOperation {
let timestamp = self.clock.new_timestamp();
CRDTOperation {
instance: self.instance,
timestamp: *timestamp.get_time(),
id: Uuid::new_v4(),
typ,
}
impl OperationFactory for SyncManager {
fn get_clock(&self) -> &HLC {
&self.clock
}
pub fn unique_shared_create<
TSyncId: SyncId<ModelTypes = TModel>,
TModel: SyncType<Marker = SharedSyncType>,
>(
&self,
id: TSyncId,
values: impl IntoIterator<Item = (&'static str, Value)> + 'static,
) -> CRDTOperation {
self.new_op(CRDTOperationType::Shared(SharedOperation {
model: TModel::MODEL.to_string(),
record_id: json!(id),
data: SharedOperationData::Create(
values
.into_iter()
.map(|(name, value)| (name.to_string(), value))
.collect(),
),
}))
}
pub fn shared_update<
TSyncId: SyncId<ModelTypes = TModel>,
TModel: SyncType<Marker = SharedSyncType>,
>(
&self,
id: TSyncId,
field: &str,
value: Value,
) -> CRDTOperation {
self.new_op(CRDTOperationType::Shared(SharedOperation {
model: TModel::MODEL.to_string(),
record_id: json!(id),
data: SharedOperationData::Update {
field: field.to_string(),
value,
},
}))
fn get_instance(&self) -> Uuid {
self.instance
}
}

View file

@ -2,3 +2,4 @@ mod manager;
pub use crate::prisma_sync::*;
pub use manager::*;
pub use sd_sync::*;

1
crates/prisma/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
src/*/

View file

@ -1,10 +1,14 @@
mod attribute;
mod model;
mod sync_data;
use attribute::*;
use prisma_client_rust_sdk::{
prelude::*,
prisma::prisma_models::walkers::{FieldWalker, ModelWalker, RefinedFieldWalker},
prisma::prisma_models::walkers::{
FieldWalker, ModelWalker, RefinedFieldWalker, RelationFieldWalker,
},
};
#[derive(Debug, serde::Serialize, thiserror::Error)]
@ -13,51 +17,70 @@ enum Error {}
#[derive(serde::Deserialize)]
struct SDSyncGenerator {}
type FieldVec<'a> = Vec<FieldWalker<'a>>;
#[allow(unused)]
#[derive(Clone)]
enum ModelSyncType<'a> {
pub enum ModelSyncType<'a> {
Local {
id: FieldVec<'a>,
id: FieldWalker<'a>,
},
// Owned {
// id: FieldVec<'a>,
// },
Shared {
id: FieldVec<'a>,
id: FieldWalker<'a>,
},
Relation {
group: FieldVec<'a>,
item: FieldVec<'a>,
group: RelationFieldWalker<'a>,
item: RelationFieldWalker<'a>,
},
}
impl<'a> ModelSyncType<'a> {
fn from_attribute(attr: Attribute, model: ModelWalker<'a>) -> Option<Self> {
let id = attr
.field("id")
.map(|field| match field {
AttributeFieldValue::Single(s) => vec![*s],
AttributeFieldValue::List(l) => l.clone(),
})
.unwrap_or_else(|| {
model
.primary_key()
.as_ref()
.unwrap()
.fields()
.map(|f| f.name())
.collect()
})
.into_iter()
.flat_map(|name| model.fields().find(|f| f.name() == name))
.collect();
Some(match attr.name {
"local" => Self::Local { id },
"local" | "shared" => {
let id = attr
.field("id")
.and_then(|field| match field {
AttributeFieldValue::Single(s) => Some(s),
AttributeFieldValue::List(l) => None,
})
.and_then(|name| model.fields().find(|f| f.name() == *name))?;
match attr.name {
"local" => Self::Local { id },
"shared" => Self::Shared { id },
_ => return None,
}
}
"relation" => {
let get_field = |name| {
attr.field(name)
.and_then(|field| match field {
AttributeFieldValue::Single(s) => Some(*s),
AttributeFieldValue::List(l) => None,
})
.and_then(|name| {
match model
.fields()
.find(|f| f.name() == name)
.expect(&format!("'{name}' field not found"))
.refine()
{
RefinedFieldWalker::Relation(r) => Some(r),
_ => None,
}
})
.expect(&format!("'{name}' must be a relation field"))
};
Self::Relation {
item: get_field("item"),
group: get_field("group"),
}
}
// "owned" => Self::Owned { id },
"shared" => Self::Shared { id },
_ => return None,
})
}
@ -65,8 +88,9 @@ impl<'a> ModelSyncType<'a> {
fn sync_id(&self) -> Vec<FieldWalker> {
match self {
// Self::Owned { id } => id.clone(),
Self::Local { id } => id.clone(),
Self::Shared { id } => id.clone(),
Self::Local { id } => vec![id.clone()],
Self::Shared { id } => vec![id.clone()],
Self::Relation { group, item } => vec![(*group).into(), (*item).into()],
_ => vec![],
}
}
@ -85,13 +109,15 @@ impl ToTokens for ModelSyncType<'_> {
}
}
pub type ModelWithSyncType<'a> = (ModelWalker<'a>, Option<ModelSyncType<'a>>);
impl PrismaGenerator for SDSyncGenerator {
const NAME: &'static str = "SD Sync Generator";
const DEFAULT_OUTPUT: &'static str = "prisma-sync.rs";
type Error = Error;
fn generate(self, args: GenerateArgs) -> Result<String, Self::Error> {
fn generate(self, args: GenerateArgs) -> Result<Module, Self::Error> {
let db = &args.schema.db;
let models_with_sync_types = db
@ -106,213 +132,22 @@ impl PrismaGenerator for SDSyncGenerator {
})
.collect::<Vec<_>>();
let model_modules = models_with_sync_types.clone().into_iter().map(|(model, sync_type)| {
let model_name_snake = snake_ident(model.name());
let sync_id = sync_type.as_ref()
.map(|sync_type| {
let fields = sync_type.sync_id();
let fields = fields.iter().flat_map(|field| {
let name_snake = snake_ident(field.name());
let typ = match field.refine() {
RefinedFieldWalker::Scalar(_) => {
field.type_tokens(&quote!(self))
},
RefinedFieldWalker::Relation(relation)=> {
let relation_model_name_snake = snake_ident(relation.related_model().name());
Some(quote!(super::#relation_model_name_snake::SyncId))
},
};
Some(quote!(pub #name_snake: #typ))
});
quote! {
#[derive(serde::Serialize, serde::Deserialize)]
pub struct SyncId {
#(#fields),*
}
impl sd_sync::SyncId for SyncId {
type ModelTypes = #model_name_snake::Types;
}
impl sd_sync::SyncType for #model_name_snake::Types {
type SyncId = SyncId;
type Marker = sd_sync::#sync_type;
}
}
});
let set_param_impl = {
let field_matches = model.fields().filter_map(|field| {
let field_name_snake = snake_ident(field.name());
match field.refine() {
RefinedFieldWalker::Scalar(scalar_field) => {
(!scalar_field.is_in_required_relation()).then(|| quote! {
#model_name_snake::#field_name_snake::set(::serde_json::from_value(val).unwrap()),
})
},
RefinedFieldWalker::Relation(relation_field) => {
let relation_model_name_snake = snake_ident(relation_field.related_model().name());
match relation_field.referenced_fields() {
Some(i) => {
if i.count() == 1 {
Some(quote! {{
let val: std::collections::HashMap<String, ::serde_json::Value> = ::serde_json::from_value(val).unwrap();
let val = val.into_iter().next().unwrap();
#model_name_snake::#field_name_snake::connect(
#relation_model_name_snake::UniqueWhereParam::deserialize(&val.0, val.1).unwrap()
)
}})
} else { None }
},
_ => None
}
},
}.map(|body| quote!(#model_name_snake::#field_name_snake::NAME => #body))
});
match field_matches.clone().count() {
0 => quote!(),
_ => quote! {
impl #model_name_snake::SetParam {
pub fn deserialize(field: &str, val: ::serde_json::Value) -> Option<Self> {
Some(match field {
#(#field_matches)*
_ => return None
})
}
}
}
}
};
let unique_param_impl = {
let field_matches = model
.unique_criterias()
.flat_map(|criteria| match &criteria.fields().next() {
Some(field) if criteria.fields().len() == 1 => {
let field_name_snake = snake_ident(field.name());
Some(quote!(#model_name_snake::#field_name_snake::NAME =>
#model_name_snake::#field_name_snake::equals(
::serde_json::from_value(val).unwrap()
),
))
}
_ => None,
})
.collect::<Vec<_>>();
match field_matches.len() {
0 => quote!(),
_ => quote! {
impl #model_name_snake::UniqueWhereParam {
pub fn deserialize(field: &str, val: ::serde_json::Value) -> Option<Self> {
Some(match field {
#(#field_matches)*
_ => return None
})
}
}
},
}
};
quote! {
pub mod #model_name_snake {
use super::prisma::*;
#sync_id
#set_param_impl
#unique_param_impl
}
}
});
let model_sync_data = {
let (variants, matches): (Vec<_>, Vec<_>) = models_with_sync_types
.into_iter()
.filter_map(|(model, sync_type)| {
let model_name_snake = snake_ident(model.name());
let model_name_pascal = pascal_ident(model.name());
sync_type.and_then(|a| {
let data_type = match a {
// ModelSyncType::Owned { .. } => quote!(OwnedOperationData),
ModelSyncType::Shared { .. } => quote!(SharedOperationData),
ModelSyncType::Relation { .. } => {
quote!(RelationOperationData)
}
_ => return None,
};
let variant = quote! {
#model_name_pascal(#model_name_snake::SyncId, sd_sync::#data_type)
};
let op_type_enum = quote!(sd_sync::CRDTOperationType);
let cond = quote!(if op.model == prisma::#model_name_snake::NAME);
let match_case = match a {
// ModelSyncType::Owned { .. } => {
// quote! {
// #op_type_enum::Owned(op) #cond =>
// Self::#model_name_pascal(serde_json::from_value(op.record_id).ok()?, op.data)
// }
// }
ModelSyncType::Shared { .. } => {
quote! {
#op_type_enum::Shared(op) #cond =>
Self::#model_name_pascal(serde_json::from_value(op.record_id).ok()?, op.data)
}
}
// ModelSyncType::Relation { .. } => {
// quote! {
// (#model_name_str, sd_sync::CRDTOperation::Relation(op)) =>
// Self::#model_name_pascal()
// }
// }
_ => return None,
};
Some((variant, match_case))
})
})
.unzip();
let model_sync_data = sync_data::r#enum(models_with_sync_types.clone());
let mut module = Module::new(
"root",
quote! {
pub enum ModelSyncData {
#(#variants),*
}
use crate::prisma;
impl ModelSyncData {
pub fn from_op(op: sd_sync::CRDTOperationType) -> Option<Self> {
Some(match op {
#(#matches),*,
_ => return None
})
}
}
}
};
#model_sync_data
},
);
models_with_sync_types
.into_iter()
.map(model::module)
.for_each(|model| module.add_submodule(model));
Ok(quote! {
use crate::prisma;
#model_sync_data
#(#model_modules)*
}
.to_string())
Ok(module)
}
}

View file

@ -0,0 +1,163 @@
use prisma_client_rust_sdk::{prelude::*, prisma::prisma_models::walkers::RefinedFieldWalker};
use crate::{ModelSyncType, ModelWithSyncType};
pub fn module((model, sync_type): ModelWithSyncType) -> Module {
let model_name_snake = snake_ident(model.name());
let sync_id = sync_type.as_ref().map(|sync_type| {
let fields = sync_type.sync_id();
let fields = fields.iter().flat_map(|field| {
let name_snake = snake_ident(field.name());
let typ = match field.refine() {
RefinedFieldWalker::Scalar(_) => field.type_tokens(&quote!(self)),
RefinedFieldWalker::Relation(relation) => {
let relation_model_name_snake = snake_ident(relation.related_model().name());
Some(quote!(super::#relation_model_name_snake::SyncId))
}
};
Some(quote!(pub #name_snake: #typ))
});
let model_stuff = match sync_type {
ModelSyncType::Relation { item, group } => {
let item_name_snake = snake_ident(item.name());
let item_model_name_snake = snake_ident(item.related_model().name());
let group_name_snake = snake_ident(group.name());
let group_model_name_snake = snake_ident(group.related_model().name());
Some(quote! {
impl sd_sync::RelationSyncId for SyncId {
type ItemSyncId = super::#item_model_name_snake::SyncId;
type GroupSyncId = super::#group_model_name_snake::SyncId;
fn split(&self) -> (&Self::ItemSyncId, &Self::GroupSyncId) {
(
&self.#item_name_snake,
&self.#group_name_snake
)
}
}
impl sd_sync::RelationSyncModel for #model_name_snake::Types {
type SyncId = SyncId;
}
})
}
ModelSyncType::Shared { .. } => Some(quote! {
impl sd_sync::SharedSyncModel for #model_name_snake::Types {
type SyncId = SyncId;
}
}),
_ => None,
};
quote! {
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct SyncId {
#(#fields),*
}
impl sd_sync::SyncId for SyncId {
type Model = #model_name_snake::Types;
}
#model_stuff
}
});
let set_param_impl = {
let field_matches = model.fields().filter_map(|field| {
let field_name_snake = snake_ident(field.name());
match field.refine() {
RefinedFieldWalker::Scalar(scalar_field) => {
(!scalar_field.is_in_required_relation()).then(|| quote! {
#model_name_snake::#field_name_snake::set(::serde_json::from_value(val).unwrap()),
})
},
RefinedFieldWalker::Relation(relation_field) => {
let relation_model_name_snake = snake_ident(relation_field.related_model().name());
match relation_field.referenced_fields() {
Some(i) => {
if i.count() == 1 {
Some(quote! {{
let val: std::collections::HashMap<String, ::serde_json::Value> = ::serde_json::from_value(val).unwrap();
let val = val.into_iter().next().unwrap();
#model_name_snake::#field_name_snake::connect(
#relation_model_name_snake::UniqueWhereParam::deserialize(&val.0, val.1).unwrap()
)
}})
} else { None }
},
_ => None
}
},
}.map(|body| quote!(#model_name_snake::#field_name_snake::NAME => #body))
});
match field_matches.clone().count() {
0 => quote!(),
_ => quote! {
impl #model_name_snake::SetParam {
pub fn deserialize(field: &str, val: ::serde_json::Value) -> Option<Self> {
Some(match field {
#(#field_matches)*
_ => return None
})
}
}
},
}
};
let unique_param_impl = {
let field_matches = model
.unique_criterias()
.flat_map(|criteria| match &criteria.fields().next() {
Some(field) if criteria.fields().len() == 1 => {
let field_name_snake = snake_ident(field.name());
Some(quote!(#model_name_snake::#field_name_snake::NAME =>
#model_name_snake::#field_name_snake::equals(
::serde_json::from_value(val).unwrap()
),
))
}
_ => None,
})
.collect::<Vec<_>>();
match field_matches.len() {
0 => quote!(),
_ => quote! {
impl #model_name_snake::UniqueWhereParam {
pub fn deserialize(field: &str, val: ::serde_json::Value) -> Option<Self> {
Some(match field {
#(#field_matches)*
_ => return None
})
}
}
},
}
};
Module::new(
model.name(),
quote! {
use super::prisma::*;
#sync_id
#set_param_impl
#unique_param_impl
},
)
}

View file

@ -0,0 +1,223 @@
use prisma_client_rust_sdk::{prelude::*, prisma::prisma_models::walkers::RelationFieldWalker};
use crate::{ModelSyncType, ModelWithSyncType};
pub fn r#enum(models: Vec<ModelWithSyncType>) -> TokenStream {
let (variants, matches): (Vec<_>, Vec<_>) = models
.iter()
.filter_map(|(model, sync_type)| {
let model_name_snake = snake_ident(model.name());
let model_name_pascal = pascal_ident(model.name());
sync_type.as_ref().and_then(|a| {
let data_type = match a {
// ModelSyncType::Owned { .. } => quote!(OwnedOperationData),
ModelSyncType::Shared { .. } => quote!(SharedOperationData),
ModelSyncType::Relation { .. } => {
quote!(RelationOperationData)
}
_ => return None,
};
let variant = quote! {
#model_name_pascal(#model_name_snake::SyncId, sd_sync::#data_type)
};
let op_type_enum = quote!(sd_sync::CRDTOperationType);
let match_case = match a {
// ModelSyncType::Owned { .. } => {
// quote! {
// #op_type_enum::Owned(op) #cond =>
// Self::#model_name_pascal(serde_json::from_value(op.record_id).ok()?, op.data)
// }
// }
ModelSyncType::Shared { .. } => {
quote! {
#op_type_enum::Shared(op) if op.model == prisma::#model_name_snake::NAME =>
Self::#model_name_pascal(serde_json::from_value(op.record_id).ok()?, op.data)
}
}
ModelSyncType::Relation { item, group } => {
let item_name_snake = snake_ident(item.name());
let group_name_snake = snake_ident(group.name());
quote! {
#op_type_enum::Relation(op) if op.relation == prisma::#model_name_snake::NAME =>
Self::#model_name_pascal(
#model_name_snake::SyncId {
#item_name_snake: serde_json::from_value(op.relation_item).ok()?,
#group_name_snake: serde_json::from_value(op.relation_group).ok()?,
},
op.data
)
}
}
_ => return None,
};
Some((variant, match_case))
})
})
.unzip();
let exec_matches = models.iter().filter_map(|(model, sync_type)| {
let model_name_pascal = pascal_ident(model.name());
let model_name_snake = snake_ident(model.name());
let match_arms = match sync_type.as_ref()? {
ModelSyncType::Shared { id } => {
let id_name_snake = snake_ident(id.name());
quote! {
match data {
sd_sync::SharedOperationData::Create => {
db.#model_name_snake()
.upsert(
prisma::#model_name_snake::#id_name_snake::equals(id.#id_name_snake.clone()),
prisma::#model_name_snake::create(id.#id_name_snake, vec![]),
vec![]
)
.exec()
.await?;
},
sd_sync::SharedOperationData::Update { field, value } => {
let data = vec![
prisma::#model_name_snake::SetParam::deserialize(&field, value).unwrap()
];
db.#model_name_snake()
.upsert(
prisma::#model_name_snake::#id_name_snake::equals(id.#id_name_snake.clone()),
prisma::#model_name_snake::create(id.#id_name_snake, data.clone()),
data,
)
.exec()
.await?;
},
sd_sync::SharedOperationData::Delete => {
db.#model_name_snake()
.delete(prisma::#model_name_snake::#id_name_snake::equals(id.#id_name_snake))
.exec()
.await?;
},
}
}
}
ModelSyncType::Relation { item, group } => {
let compound_id = format_ident!(
"{}",
item.fields()
.unwrap()
.chain(group.fields().unwrap())
.map(|f| f.name())
.collect::<Vec<_>>()
.join("_")
);
let db_batch_items = {
let batch_item = |item: &RelationFieldWalker| {
let item_model_name_snake = snake_ident(item.related_model().name());
let item_field_name_snake = snake_ident(item.name());
quote!(db.#item_model_name_snake().find_unique(
prisma::#item_model_name_snake::pub_id::equals(id.#item_field_name_snake.pub_id.clone())
))
};
[batch_item(item), batch_item(group)]
};
let create_items = {
let create_item = |item: &RelationFieldWalker, var: TokenStream| {
let item_model_name_snake = snake_ident(item.related_model().name());
quote!(
prisma::#item_model_name_snake::id::equals(#var.id)
)
};
[
create_item(item, quote!(item)),
create_item(group, quote!(group)),
]
};
quote! {
let (Some(item), Some(group)) =
db._batch((#(#db_batch_items),*)).await? else {
panic!("item and group not found!");
};
let id = prisma::tag_on_object::#compound_id(item.id, group.id);
match data {
sd_sync::RelationOperationData::Create => {
db.#model_name_snake()
.create(
#(#create_items),*,
vec![],
)
.exec()
.await
.ok();
},
sd_sync::RelationOperationData::Update { field, value } => {
let data = vec![prisma::#model_name_snake::SetParam::deserialize(&field, value).unwrap()];
db.#model_name_snake()
.upsert(
id,
prisma::#model_name_snake::create(
#(#create_items),*,
data.clone(),
),
data,
)
.exec()
.await
.ok();
},
sd_sync::RelationOperationData::Delete => {
db.#model_name_snake()
.delete(id)
.exec()
.await
.ok();
},
}
}
}
_ => return None,
};
Some(quote! {
Self::#model_name_pascal(id, data) => {
#match_arms
}
})
});
quote! {
pub enum ModelSyncData {
#(#variants),*
}
impl ModelSyncData {
pub fn from_op(op: sd_sync::CRDTOperationType) -> Option<Self> {
Some(match op {
#(#matches),*,
_ => return None
})
}
pub async fn exec(self, db: &prisma::PrismaClient) -> prisma_client_rust::Result<()> {
match self {
#(#exec_matches),*
}
Ok(())
}
}
}
}

View file

@ -1,22 +1,25 @@
use std::fmt::Debug;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use serde_json::Value;
use specta::Type;
use uhlc::NTP64;
use uuid::Uuid;
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
pub enum RelationOperationData {
#[serde(rename = "c")]
Create,
#[serde(rename = "u")]
Update { field: String, value: Value },
#[serde(rename = "d")]
Delete,
}
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
pub struct RelationOperation {
pub relation_item: Uuid,
pub relation_group: Uuid,
pub relation_item: Value,
pub relation_group: Value,
pub relation: String,
pub data: RelationOperationData,
}
@ -24,7 +27,7 @@ pub struct RelationOperation {
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
pub enum SharedOperationData {
#[serde(rename = "c")]
Create(Map<String, Value>),
Create,
#[serde(rename = "u")]
Update { field: String, value: Value },
#[serde(rename = "d")]

126
crates/sync/src/factory.rs Normal file
View file

@ -0,0 +1,126 @@
use serde_json::{json, Value};
use uhlc::HLC;
use uuid::Uuid;
use crate::*;
pub trait OperationFactory {
fn get_clock(&self) -> &HLC;
fn get_instance(&self) -> Uuid;
fn new_op(&self, typ: CRDTOperationType) -> CRDTOperation {
let timestamp = self.get_clock().new_timestamp();
CRDTOperation {
instance: self.get_instance(),
timestamp: *timestamp.get_time(),
id: Uuid::new_v4(),
typ,
}
}
fn shared_op<TSyncId: SyncId<Model = TModel>, TModel: SharedSyncModel>(
&self,
id: &TSyncId,
data: SharedOperationData,
) -> CRDTOperation {
self.new_op(CRDTOperationType::Shared(SharedOperation {
model: TModel::MODEL.to_string(),
record_id: json!(id),
data,
}))
}
fn shared_create<TSyncId: SyncId<Model = TModel>, TModel: SharedSyncModel>(
&self,
id: TSyncId,
values: impl IntoIterator<Item = (&'static str, Value)> + 'static,
) -> Vec<CRDTOperation> {
[self.shared_op(&id, SharedOperationData::Create)]
.into_iter()
.chain(values.into_iter().map(|(name, value)| {
self.shared_op(
&id,
SharedOperationData::Update {
field: name.to_string(),
value,
},
)
}))
.collect()
}
fn shared_update<TSyncId: SyncId<Model = TModel>, TModel: SharedSyncModel>(
&self,
id: TSyncId,
field: impl Into<String>,
value: Value,
) -> CRDTOperation {
self.shared_op(
&id,
SharedOperationData::Update {
field: field.into(),
value,
},
)
}
fn shared_delete<TSyncId: SyncId<Model = TModel>, TModel: SharedSyncModel>(
&self,
id: TSyncId,
) -> CRDTOperation {
self.shared_op(&id, SharedOperationData::Delete)
}
fn relation_op<TSyncId: RelationSyncId<Model = TModel>, TModel: RelationSyncModel>(
&self,
id: &TSyncId,
data: RelationOperationData,
) -> CRDTOperation {
let (item_id, group_id) = id.split();
self.new_op(CRDTOperationType::Relation(RelationOperation {
relation_item: json!(item_id),
relation_group: json!(group_id),
relation: TModel::MODEL.to_string(),
data,
}))
}
fn relation_create<TSyncId: RelationSyncId<Model = TModel>, TModel: RelationSyncModel>(
&self,
id: TSyncId,
values: impl IntoIterator<Item = (&'static str, Value)> + 'static,
) -> Vec<CRDTOperation> {
[self.relation_op(&id, RelationOperationData::Create)]
.into_iter()
.chain(values.into_iter().map(|(name, value)| {
self.relation_op(
&id,
RelationOperationData::Update {
field: name.to_string(),
value,
},
)
}))
.collect()
}
fn relation_update<TSyncId: RelationSyncId<Model = TModel>, TModel: RelationSyncModel>(
&self,
id: TSyncId,
field: impl Into<String>,
value: Value,
) -> CRDTOperation {
self.relation_op(
&id,
RelationOperationData::Update {
field: field.into(),
value,
},
)
}
fn relation_delete<TSyncId: RelationSyncId<Model = TModel>, TModel: RelationSyncModel>(
&self,
id: TSyncId,
) -> CRDTOperation {
self.relation_op(&id, RelationOperationData::Delete)
}
}

View file

@ -1,29 +1,7 @@
mod crdt;
mod factory;
mod model_traits;
pub use crdt::*;
use prisma_client_rust::ModelTypes;
use serde::{de::DeserializeOwned, Serialize};
pub trait SyncId: Serialize + DeserializeOwned {
type ModelTypes: SyncType;
}
pub trait SyncType: ModelTypes {
type SyncId: SyncId;
type Marker: SyncTypeMarker;
}
pub trait SyncTypeMarker {}
pub struct LocalSyncType;
impl SyncTypeMarker for LocalSyncType {}
pub struct OwnedSyncType;
impl SyncTypeMarker for OwnedSyncType {}
pub struct SharedSyncType;
impl SyncTypeMarker for SharedSyncType {}
pub struct RelationSyncType;
impl SyncTypeMarker for RelationSyncType {}
pub use factory::*;
pub use model_traits::*;

View file

@ -0,0 +1,25 @@
use prisma_client_rust::ModelTypes;
use serde::{de::DeserializeOwned, Serialize};
pub trait SyncId: Serialize + DeserializeOwned {
type Model: ModelTypes;
}
pub trait LocalSyncModel: ModelTypes {
type SyncId: SyncId;
}
pub trait SharedSyncModel: ModelTypes {
type SyncId: SyncId;
}
pub trait RelationSyncId: SyncId {
type ItemSyncId: SyncId;
type GroupSyncId: SyncId;
fn split(&self) -> (&Self::ItemSyncId, &Self::GroupSyncId);
}
pub trait RelationSyncModel: ModelTypes {
type SyncId: RelationSyncId;
}

View file

@ -11,13 +11,9 @@ const OperationItem = ({ op }: { op: CRDTOperation }) => {
if ('model' in op.typ) {
let subContents = null;
if (op.typ.data === 'd') {
subContents = 'Delete';
} else if ('c' in op.typ.data) {
subContents = 'Create';
} else {
subContents = `Update - ${op.typ.data.u.field}`;
}
if (op.typ.data === 'd') subContents = 'Delete';
else if (op.typ.data === 'c') subContents = 'Create';
else subContents = `Update - ${op.typ.data.u.field}`;
contents = (
<>

View file

@ -263,9 +263,9 @@ export type PeerId = string
export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; version: string | null; email: string | null; img_url: string | null; instances: string[] }
export type RelationOperation = { relation_item: string; relation_group: string; relation: string; data: RelationOperationData }
export type RelationOperation = { relation_item: any; relation_group: any; relation: string; data: RelationOperationData }
export type RelationOperationData = "Create" | { Update: { field: string; value: any } } | "Delete"
export type RelationOperationData = "c" | { u: { field: string; value: any } } | "d"
export type RenameFileArgs = { location_id: number; kind: RenameKind }
@ -287,7 +287,7 @@ export type SetNoteArgs = { id: number; note: string | null }
export type SharedOperation = { record_id: any; model: string; data: SharedOperationData }
export type SharedOperationData = { c: { [key: string]: any } } | { u: { field: string; value: any } } | "d"
export type SharedOperationData = "c" | { u: { field: string; value: any } } | "d"
export type SortOrder = "Asc" | "Desc"