diff --git a/Cargo.lock b/Cargo.lock index 126cdd1ef..b3ac4d0b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7353,6 +7353,8 @@ checksum = "2e0e0214a4a2b444ecce41a4025792fc31f77c7bb89c46d253953ea8c65701ec" dependencies = [ "num-traits", "rmp", + "serde", + "serde_bytes", ] [[package]] @@ -7684,6 +7686,7 @@ dependencies = [ "prisma-client-rust", "reqwest", "rmp-serde", + "rmpv", "sd-core-sync", "sd-file-path-helper", "sd-prisma", @@ -7726,6 +7729,7 @@ version = "0.1.0" dependencies = [ "base64 0.21.7", "reqwest", + "rmpv", "rspc", "sd-p2p2", "serde", @@ -7779,6 +7783,7 @@ dependencies = [ "prisma-client-rust", "regex", "reqwest", + "rmp", "rmp-serde", "rmpv", "rspc", @@ -7831,6 +7836,8 @@ name = "sd-core-sync" version = "0.0.0" dependencies = [ "prisma-client-rust", + "rmp-serde", + "rmpv", "sd-prisma", "sd-sync", "sd-utils", @@ -8119,6 +8126,8 @@ name = "sd-prisma" version = "0.1.0" dependencies = [ "prisma-client-rust", + "rmp-serde", + "rmpv", "sd-cache", "sd-sync", "serde", @@ -8144,6 +8153,9 @@ name = "sd-sync" version = "0.1.0" dependencies = [ "prisma-client-rust", + "rmp", + "rmp-serde", + "rmpv", "serde", "serde_json", "specta", @@ -8318,6 +8330,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_bytes" +version = "0.11.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab33ec92f677585af6d88c65593ae2375adde54efdbf16d597f2cbc7a6d368ff" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" version = "1.0.196" diff --git a/Cargo.toml b/Cargo.toml index 9cf2fbdb1..cd0b5cf59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,18 +1,18 @@ [workspace] resolver = "2" members = [ - "core", - "core/crates/*", - "crates/*", - # "crates/p2p/tunnel", - # "crates/p2p/tunnel/utils", - "apps/cli", - "apps/desktop/src-tauri", - "apps/desktop/crates/*", - "apps/mobile/modules/sd-core/core", - "apps/mobile/modules/sd-core/android/crate", - "apps/mobile/modules/sd-core/ios/crate", - "apps/server", + "core", + "core/crates/*", + "crates/*", + # "crates/p2p/tunnel", + # "crates/p2p/tunnel/utils", + "apps/cli", + "apps/desktop/src-tauri", + "apps/desktop/crates/*", + "apps/mobile/modules/sd-core/core", + "apps/mobile/modules/sd-core/android/crate", + "apps/mobile/modules/sd-core/ios/crate", + "apps/server", ] [workspace.package] @@ -23,19 +23,19 @@ repository = "https://github.com/spacedriveapp/spacedrive" [workspace.dependencies] # First party dependencies prisma-client-rust = { git = "https://github.com/spacedriveapp/prisma-client-rust", rev = "f99d6f5566570f3ab1edecb7a172ad25b03d95af", features = [ - "rspc", - "sqlite-create-many", - "migrations", - "sqlite", + "rspc", + "sqlite-create-many", + "migrations", + "sqlite", ], default-features = false } prisma-client-rust-cli = { git = "https://github.com/spacedriveapp/prisma-client-rust", rev = "f99d6f5566570f3ab1edecb7a172ad25b03d95af", features = [ - "rspc", - "sqlite-create-many", - "migrations", - "sqlite", + "rspc", + "sqlite-create-many", + "migrations", + "sqlite", ], default-features = false } prisma-client-rust-sdk = { git = "https://github.com/spacedriveapp/prisma-client-rust", rev = "f99d6f5566570f3ab1edecb7a172ad25b03d95af", features = [ - "sqlite", + "sqlite", ], default-features = false } tracing = "0.1.40" @@ -71,6 +71,7 @@ rand_chacha = "0.3.1" regex = "1.10.2" reqwest = "0.11.22" rmp-serde = "1.1.2" +rmpv = { version = "^1.0.1", features = ["with-serde"] } serde = "1.0" serde_json = "1.0" strum = "0.25" diff --git a/core/Cargo.toml b/core/Cargo.toml index a9d80fa7b..61036baaf 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -23,18 +23,18 @@ sd-cache = { path = "../crates/cache" } sd-core-sync = { path = "./crates/sync" } # sd-cloud-api = { path = "../crates/cloud-api" } sd-crypto = { path = "../crates/crypto", features = [ - "rspc", - "specta", - "serde", - "keymanager", + "rspc", + "specta", + "serde", + "keymanager", ] } sd-file-path-helper = { path = "../crates/file-path-helper" } sd-ffmpeg = { path = "../crates/ffmpeg", optional = true } sd-file-ext = { path = "../crates/file-ext" } sd-images = { path = "../crates/images", features = [ - "rspc", - "serde", - "specta", + "rspc", + "serde", + "specta", ] } sd-media-metadata = { path = "../crates/media-metadata" } sd-p2p2 = { path = "../crates/p2p2", features = ["specta"] } @@ -65,12 +65,12 @@ regex = { workspace = true } reqwest = { workspace = true, features = ["json", "native-tls-vendored"] } rmp-serde = { workspace = true } rspc = { workspace = true, features = [ - "axum", - "uuid", - "chrono", - "tracing", - "alpha", - "unstable", + "axum", + "uuid", + "chrono", + "tracing", + "alpha", + "unstable", ] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } @@ -80,12 +80,12 @@ strum_macros = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = [ - "sync", - "rt-multi-thread", - "io-util", - "macros", - "time", - "process", + "sync", + "rt-multi-thread", + "io-util", + "macros", + "time", + "process", ] } tokio-stream = { workspace = true, features = ["fs"] } tokio-util = { workspace = true, features = ["io"] } @@ -111,10 +111,10 @@ int-enum = "0.5.0" itertools = "0.12.0" libc = "0.2.153" mini-moka = "0.10.2" -notify = { git="https://github.com/notify-rs/notify.git", rev="c3929ed114fbb0bc7457a9a498260461596b00ca", default-features = false, features = [ - "macos_fsevent", +notify = { git = "https://github.com/notify-rs/notify.git", rev = "c3929ed114fbb0bc7457a9a498260461596b00ca", default-features = false, features = [ + "macos_fsevent", ] } -rmpv = "^1.0.1" +rmpv = { workspace = true } serde-hashkey = "0.4.5" serde_repr = "0.1" serde_with = "3.4.0" @@ -129,6 +129,7 @@ base91 = "0.1.0" sd-actors = { version = "0.1.0", path = "../crates/actors" } tower-service = "0.3.2" hyper = { version = "=0.14.28", features = ["http1", "server", "client"] } +rmp = "0.8.12" # Override features of transitive dependencies [dependencies.openssl] diff --git a/core/crates/sync/Cargo.toml b/core/crates/sync/Cargo.toml index c564aa56c..4e8ad0062 100644 --- a/core/crates/sync/Cargo.toml +++ b/core/crates/sync/Cargo.toml @@ -18,3 +18,5 @@ tokio = { workspace = true } uuid = { workspace = true } tracing = { workspace = true } uhlc = { workspace = true } +rmp-serde = "1.1.2" +rmpv = { workspace = true } diff --git a/core/crates/sync/src/backfill.rs b/core/crates/sync/src/backfill.rs index 832315e22..d32e3998f 100644 --- a/core/crates/sync/src/backfill.rs +++ b/core/crates/sync/src/backfill.rs @@ -9,10 +9,15 @@ use sd_prisma::{ }; use sd_sync::{option_sync_entry, OperationFactory}; use sd_utils::chain_optional_iter; -use serde_json::json; use crate::crdt_op_unchecked_db; +macro_rules! msgpack { + ($e:expr) => { + ::rmpv::ext::to_value($e).expect("failed to serialize msgpack") + } +} + pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, instance_id: i32) { db._transaction() .with_timeout(9999999999) @@ -254,12 +259,14 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta chain_optional_iter( [], [ - t.name.map(|v| (tag::name::NAME, json!(v))), - t.color.map(|v| (tag::color::NAME, json!(v))), - t.date_created - .map(|v| (tag::date_created::NAME, json!(v))), - t.date_modified - .map(|v| (tag::date_modified::NAME, json!(v))), + t.name.map(|v| (tag::name::NAME, msgpack!(v))), + t.color.map(|v| (tag::color::NAME, msgpack!(v))), + t.date_created.map(|v| { + (tag::date_created::NAME, msgpack!(v)) + }), + t.date_modified.map(|v| { + (tag::date_modified::NAME, msgpack!(v)) + }), ], ), ) @@ -272,43 +279,53 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta ) .await?; - let tag_on_objects = db - .tag_on_object() - .find_many(vec![]) - .include(tag_on_object::include!({ - tag: select { pub_id } - object: select { pub_id } - })) - .exec() - .await?; - db.crdt_operation() - .create_many( - tag_on_objects - .into_iter() - .flat_map(|t_o| { - sync.relation_create( - prisma_sync::tag_on_object::SyncId { - tag: prisma_sync::tag::SyncId { - pub_id: t_o.tag.pub_id, - }, - object: prisma_sync::object::SyncId { - pub_id: t_o.object.pub_id, - }, - }, - chain_optional_iter( - [], - [option_sync_entry!( - t_o.date_created, - tag_on_object::date_created - )], - ), - ) - }) - .map(|o| crdt_op_unchecked_db(&o, instance_id)) - .collect(), - ) - .exec() - .await?; + paginate_relation( + |group_id, item_id| { + db.tag_on_object() + .find_many(vec![ + tag_on_object::tag_id::gt(group_id), + tag_on_object::object_id::gt(item_id), + ]) + .order_by(tag_on_object::tag_id::order(SortOrder::Asc)) + .order_by(tag_on_object::object_id::order(SortOrder::Asc)) + .include(tag_on_object::include!({ + tag: select { pub_id } + object: select { pub_id } + })) + .exec() + }, + |t_o| (t_o.tag_id, t_o.object_id), + |tag_on_objects| { + db.crdt_operation() + .create_many( + tag_on_objects + .into_iter() + .flat_map(|t_o| { + sync.relation_create( + prisma_sync::tag_on_object::SyncId { + tag: prisma_sync::tag::SyncId { + pub_id: t_o.tag.pub_id, + }, + object: prisma_sync::object::SyncId { + pub_id: t_o.object.pub_id, + }, + }, + chain_optional_iter( + [], + [option_sync_entry!( + t_o.date_created, + tag_on_object::date_created + )], + ), + ) + }) + .map(|o| crdt_op_unchecked_db(&o, instance_id)) + .collect(), + ) + .exec() + }, + ) + .await?; paginate( |cursor| { @@ -327,8 +344,8 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta sync.shared_create( prisma_sync::label::SyncId { name: l.name }, [ - (label::date_created::NAME, json!(l.date_created)), - (label::date_modified::NAME, json!(l.date_modified)), + (label::date_created::NAME, msgpack!(l.date_created)), + (label::date_modified::NAME, msgpack!(l.date_modified)), ], ) }) @@ -340,39 +357,50 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta ) .await?; - let label_on_objects = db - .label_on_object() - .find_many(vec![]) - .select(label_on_object::select!({ - object: select { pub_id } - label: select { name } - })) - .exec() - .await?; - let res = db - .crdt_operation() - .create_many( - label_on_objects - .into_iter() - .flat_map(|l_o| { - sync.relation_create( - prisma_sync::label_on_object::SyncId { - label: prisma_sync::label::SyncId { - name: l_o.label.name, - }, - object: prisma_sync::object::SyncId { - pub_id: l_o.object.pub_id, - }, - }, - [], - ) - }) - .map(|o| crdt_op_unchecked_db(&o, instance_id)) - .collect(), - ) - .exec() - .await; + let res = paginate_relation( + |group_id, item_id| { + db.label_on_object() + .find_many(vec![ + label_on_object::label_id::gt(group_id), + label_on_object::object_id::gt(item_id), + ]) + .order_by(label_on_object::label_id::order(SortOrder::Asc)) + .order_by(label_on_object::object_id::order(SortOrder::Asc)) + .include(label_on_object::include!({ + object: select { pub_id } + label: select { name } + })) + .exec() + }, + |l_o| (l_o.label_id, l_o.object_id), + |label_on_objects| { + db.crdt_operation() + .create_many( + label_on_objects + .into_iter() + .flat_map(|l_o| { + sync.relation_create( + prisma_sync::label_on_object::SyncId { + label: prisma_sync::label::SyncId { + name: l_o.label.name, + }, + object: prisma_sync::object::SyncId { + pub_id: l_o.object.pub_id, + }, + }, + [], + ) + }) + .map(|o| crdt_op_unchecked_db(&o, instance_id)) + .collect(), + ) + .exec() + }, + ) + .await; + println!("backfill ended"); + res }) .await @@ -389,7 +417,7 @@ async fn paginate< id: impl Fn(&T) -> i32, operations: impl Fn(Vec) -> TOperations, ) -> Result<(), E> { - let mut next_cursor = Some(0); + let mut next_cursor = Some(-1); loop { let Some(cursor) = next_cursor else { break; @@ -402,3 +430,27 @@ async fn paginate< Ok(()) } + +async fn paginate_relation< + T, + E: std::fmt::Debug, + TGetter: Future, E>>, + TOperations: Future>, +>( + getter: impl Fn(i32, i32) -> TGetter, + id: impl Fn(&T) -> (i32, i32), + operations: impl Fn(Vec) -> TOperations, +) -> Result<(), E> { + let mut next_cursor = Some((-1, -1)); + loop { + let Some(cursor) = next_cursor else { + break; + }; + + let items = getter(cursor.0, cursor.1).await?; + next_cursor = items.last().map(&id); + operations(items).await?; + } + + Ok(()) +} diff --git a/core/src/api/files.rs b/core/src/api/files.rs index cd4202fb5..10cdceeef 100644 --- a/core/src/api/files.rs +++ b/core/src/api/files.rs @@ -26,8 +26,7 @@ use sd_prisma::{ prisma_sync, }; use sd_sync::OperationFactory; -use sd_utils::{db::maybe_missing, error::FileIOError}; -use serde_json::json; +use sd_utils::{db::maybe_missing, error::FileIOError, msgpack}; use std::{ ffi::OsString, @@ -204,7 +203,7 @@ pub(crate) fn mount() -> AlphaRouter { pub_id: object.pub_id, }, object::note::NAME, - json!(&args.note), + msgpack!(&args.note), ), db.object().update( object::id::equals(args.id), @@ -250,7 +249,7 @@ pub(crate) fn mount() -> AlphaRouter { pub_id: object.pub_id, }, object::favorite::NAME, - json!(&args.favorite), + msgpack!(&args.favorite), ), db.object().update( object::id::equals(args.id), @@ -316,7 +315,7 @@ pub(crate) fn mount() -> AlphaRouter { sync.shared_update( prisma_sync::object::SyncId { pub_id: d.pub_id }, object::date_accessed::NAME, - json!(date_accessed), + msgpack!(date_accessed), ), d.id, ) @@ -359,7 +358,7 @@ pub(crate) fn mount() -> AlphaRouter { sync.shared_update( prisma_sync::object::SyncId { pub_id: d.pub_id }, object::date_accessed::NAME, - json!(null), + msgpack!(null), ), d.id, ) diff --git a/core/src/api/tags.rs b/core/src/api/tags.rs index 762f9c198..f409996e0 100644 --- a/core/src/api/tags.rs +++ b/core/src/api/tags.rs @@ -6,7 +6,7 @@ use sd_prisma::{ prisma_sync, }; use sd_sync::OperationFactory; -use sd_utils::uuid_to_bytes; +use sd_utils::{msgpack, uuid_to_bytes}; use std::collections::BTreeMap; @@ -14,7 +14,6 @@ use chrono::{DateTime, Utc}; use itertools::{Either, Itertools}; use rspc::{alpha::AlphaRouter, ErrorCode}; use serde::{Deserialize, Serialize}; -use serde_json::json; use specta::Type; use uuid::Uuid; @@ -234,7 +233,7 @@ pub(crate) fn mount() -> AlphaRouter { pub_id: fp.pub_id.clone(), }, file_path::object::NAME, - json!(id), + msgpack!(id), )); ( @@ -329,8 +328,8 @@ pub(crate) fn mount() -> AlphaRouter { db, ( [ - args.name.as_ref().map(|v| (tag::name::NAME, json!(v))), - args.color.as_ref().map(|v| (tag::color::NAME, json!(v))), + args.name.as_ref().map(|v| (tag::name::NAME, msgpack!(v))), + args.color.as_ref().map(|v| (tag::color::NAME, msgpack!(v))), ] .into_iter() .flatten() diff --git a/core/src/cloud/sync/mod.rs b/core/src/cloud/sync/mod.rs index 5abd6619a..7aeaf76fe 100644 --- a/core/src/cloud/sync/mod.rs +++ b/core/src/cloud/sync/mod.rs @@ -1,6 +1,5 @@ use sd_sync::*; use serde::{Deserialize, Serialize}; -use serde_json::Value; use std::sync::{atomic, Arc}; use tokio::sync::Notify; use uuid::Uuid; @@ -80,7 +79,7 @@ macro_rules! err_break { } pub(crate) use err_break; -pub type CompressedCRDTOperationsForModel = Vec<(Value, Vec)>; +pub type CompressedCRDTOperationsForModel = Vec<(rmpv::Value, Vec)>; #[derive(Serialize, Deserialize)] pub struct CompressedCRDTOperations(Vec<(Uuid, Vec<(String, CompressedCRDTOperationsForModel)>)>); @@ -168,7 +167,7 @@ impl CompressedCRDTOperations { } } -#[derive(PartialEq, Eq, Serialize, Deserialize, Clone)] +#[derive(PartialEq, Serialize, Deserialize, Clone)] pub struct CompressedCRDTOperation { pub timestamp: NTP64, pub id: Uuid, diff --git a/core/src/cloud/sync/receive.rs b/core/src/cloud/sync/receive.rs index fb1db8f9d..8a03afbc1 100644 --- a/core/src/cloud/sync/receive.rs +++ b/core/src/cloud/sync/receive.rs @@ -155,10 +155,9 @@ pub async fn run_actor( e.insert(NTP64(0)); } - let compressed_operations: CompressedCRDTOperations = - err_break!(serde_json::from_slice(err_break!( - &BASE64_STANDARD.decode(collection.contents) - ))); + let compressed_operations: CompressedCRDTOperations = err_break!( + rmp_serde::from_slice(err_break!(&BASE64_STANDARD.decode(collection.contents))) + ); err_break!(write_cloud_ops_to_db(compressed_operations.into_ops(), &db).await); diff --git a/core/src/cloud/sync/send.rs b/core/src/cloud/sync/send.rs index a2259db66..3bae25e7c 100644 --- a/core/src/cloud/sync/send.rs +++ b/core/src/cloud/sync/send.rs @@ -68,13 +68,17 @@ pub async fn run_actor( let ops_len = ops.len(); + use base64::prelude::*; + instances.push(do_add::Input { uuid: req_add.instance_uuid, key: req_add.key, start_time, end_time, - contents: serde_json::to_value(CompressedCRDTOperations::new(ops)) - .expect("CompressedCRDTOperation should serialize!"), + contents: BASE64_STANDARD.encode( + rmp_serde::to_vec_named(&CompressedCRDTOperations::new(ops)) + .expect("CompressedCRDTOperation should serialize!"), + ), ops_count: ops_len, }) } diff --git a/core/src/location/indexer/mod.rs b/core/src/location/indexer/mod.rs index 53a2ad51e..815eefec3 100644 --- a/core/src/location/indexer/mod.rs +++ b/core/src/location/indexer/mod.rs @@ -8,7 +8,7 @@ use sd_prisma::{ prisma_sync, }; use sd_sync::*; -use sd_utils::{db::inode_to_db, error::FileIOError, from_bytes_to_uuid}; +use sd_utils::{db::inode_to_db, error::FileIOError, from_bytes_to_uuid, msgpack}; use std::{collections::HashMap, path::Path}; @@ -18,7 +18,6 @@ use itertools::Itertools; use prisma_client_rust::operator::or; use rspc::ErrorCode; use serde::{Deserialize, Serialize}; -use serde_json::json; use thiserror::Error; use tracing::{trace, warn}; @@ -110,7 +109,7 @@ async fn execute_indexer_save_step( ( ( location::NAME, - json!(prisma_sync::location::SyncId { + msgpack!(prisma_sync::location::SyncId { pub_id: location.pub_id.clone() }), ), @@ -195,11 +194,9 @@ async fn execute_indexer_update_step( let (sync_params, db_params): (Vec<_>, Vec<_>) = [ // As this file was updated while Spacedrive was offline, we mark the object_id and cas_id as null // So this file_path will be updated at file identifier job - should_unlink_object.then_some(( - (object_id::NAME, serde_json::Value::Null), - object::disconnect(), - )), - Some(((cas_id::NAME, serde_json::Value::Null), cas_id::set(None))), + should_unlink_object + .then_some(((object_id::NAME, msgpack!(null)), object::disconnect())), + Some(((cas_id::NAME, msgpack!(null)), cas_id::set(None))), Some(sync_db_entry!(*is_dir, is_dir)), Some(sync_db_entry!( entry.metadata.size_in_bytes.to_be_bytes().to_vec(), @@ -531,7 +528,7 @@ pub async fn reverse_update_directories_sizes( pub_id: pub_id.clone(), }, file_path::size_in_bytes_bytes::NAME, - json!(size_bytes.clone()), + msgpack!(size_bytes.clone()), ), db.file_path().update( file_path::pub_id::equals(pub_id), diff --git a/core/src/location/indexer/old_indexer_job.rs b/core/src/location/indexer/old_indexer_job.rs index a0c39b628..0c7c96a38 100644 --- a/core/src/location/indexer/old_indexer_job.rs +++ b/core/src/location/indexer/old_indexer_job.rs @@ -18,7 +18,7 @@ use sd_prisma::{ prisma_sync, }; use sd_sync::*; -use sd_utils::{db::maybe_missing, from_bytes_to_uuid}; +use sd_utils::{db::maybe_missing, from_bytes_to_uuid, msgpack}; use std::{ collections::HashMap, @@ -599,7 +599,7 @@ async fn update_directories_sizes( pub_id: file_path.pub_id.clone(), }, file_path::size_in_bytes_bytes::NAME, - json!(size_bytes.clone()), + msgpack!(size_bytes.clone()), ), db.file_path().update( file_path::pub_id::equals(file_path.pub_id), diff --git a/core/src/location/manager/watcher/utils.rs b/core/src/location/manager/watcher/utils.rs index abad73888..233828718 100644 --- a/core/src/location/manager/watcher/utils.rs +++ b/core/src/location/manager/watcher/utils.rs @@ -33,7 +33,7 @@ use sd_sync::OperationFactory; use sd_utils::{ db::{inode_from_db, inode_to_db, maybe_missing}, error::FileIOError, - uuid_to_bytes, + msgpack, uuid_to_bytes, }; #[cfg(target_family = "unix")] @@ -53,7 +53,6 @@ use std::{ use chrono::{DateTime, FixedOffset, Local, Utc}; use notify::Event; -use serde_json::json; use tokio::{ fs, io::{self, ErrorKind}, @@ -273,8 +272,8 @@ async fn inner_create_file( pub_id: pub_id.clone(), }, [ - (object::date_created::NAME, json!(date_created)), - (object::kind::NAME, json!(int_kind)), + (object::date_created::NAME, msgpack!(date_created)), + (object::kind::NAME, msgpack!(int_kind)), ], ), db.object() @@ -298,7 +297,7 @@ async fn inner_create_file( pub_id: created_file.pub_id.clone(), }, file_path::object::NAME, - json!(prisma_sync::object::SyncId { + msgpack!(prisma_sync::object::SyncId { pub_id: object_pub_id.clone() }), ), @@ -475,13 +474,13 @@ async fn inner_update_file( [ ( - (cas_id::NAME, json!(file_path.cas_id)), + (cas_id::NAME, msgpack!(file_path.cas_id)), Some(cas_id::set(file_path.cas_id.clone())), ), ( ( size_in_bytes_bytes::NAME, - json!(fs_metadata.len().to_be_bytes().to_vec()), + msgpack!(fs_metadata.len().to_be_bytes().to_vec()), ), Some(size_in_bytes_bytes::set(Some( fs_metadata.len().to_be_bytes().to_vec(), @@ -491,7 +490,7 @@ async fn inner_update_file( let date = DateTime::::from(fs_metadata.modified_or_now()).into(); ( - (date_modified::NAME, json!(date)), + (date_modified::NAME, msgpack!(date)), Some(date_modified::set(Some(date))), ) }, @@ -509,28 +508,28 @@ async fn inner_update_file( }; ( - (integrity_checksum::NAME, json!(checksum)), + (integrity_checksum::NAME, msgpack!(checksum)), Some(integrity_checksum::set(checksum)), ) }, { if current_inode != inode { ( - (inode::NAME, json!(inode)), + (inode::NAME, msgpack!(inode)), Some(inode::set(Some(inode_to_db(inode)))), ) } else { - ((inode::NAME, serde_json::Value::Null), None) + ((inode::NAME, msgpack!(null)), None) } }, { if is_hidden != file_path.hidden.unwrap_or_default() { ( - (hidden::NAME, json!(inode)), + (hidden::NAME, msgpack!(inode)), Some(hidden::set(Some(is_hidden))), ) } else { - ((hidden::NAME, serde_json::Value::Null), None) + ((hidden::NAME, msgpack!(null)), None) } }, ] @@ -582,7 +581,7 @@ async fn inner_update_file( pub_id: object.pub_id.clone(), }, object::kind::NAME, - json!(int_kind), + msgpack!(int_kind), ), db.object().update( object::id::equals(object.id), @@ -604,8 +603,8 @@ async fn inner_update_file( pub_id: pub_id.clone(), }, [ - (object::date_created::NAME, json!(date_created)), - (object::kind::NAME, json!(int_kind)), + (object::date_created::NAME, msgpack!(date_created)), + (object::kind::NAME, msgpack!(int_kind)), ], ), db.object().create( @@ -626,7 +625,7 @@ async fn inner_update_file( pub_id: file_path.pub_id.clone(), }, file_path::object::NAME, - json!(prisma_sync::object::SyncId { + msgpack!(prisma_sync::object::SyncId { pub_id: pub_id.clone() }), ), @@ -731,7 +730,7 @@ async fn inner_update_file( pub_id: file_path.pub_id.clone(), }, file_path::hidden::NAME, - json!(is_hidden), + msgpack!(is_hidden), )], db.file_path().update( file_path::pub_id::equals(file_path.pub_id.clone()), @@ -828,7 +827,7 @@ pub(super) async fn rename( sync.shared_update( sd_prisma::prisma_sync::file_path::SyncId { pub_id }, file_path::materialized_path::NAME, - json!(&new_path), + msgpack!(&new_path), ), db.file_path().update( file_path::id::equals(id), @@ -851,24 +850,24 @@ pub(super) async fn rename( ( ( file_path::materialized_path::NAME, - json!(new_path_materialized_str), + msgpack!(new_path_materialized_str), ), file_path::materialized_path::set(Some(new_path_materialized_str)), ), ( - (file_path::name::NAME, json!(new_parts.name)), + (file_path::name::NAME, msgpack!(new_parts.name)), file_path::name::set(Some(new_parts.name.to_string())), ), ( - (file_path::extension::NAME, json!(new_parts.extension)), + (file_path::extension::NAME, msgpack!(new_parts.extension)), file_path::extension::set(Some(new_parts.extension.to_string())), ), ( - (file_path::date_modified::NAME, json!(&date_modified)), + (file_path::date_modified::NAME, msgpack!(&date_modified)), file_path::date_modified::set(Some(date_modified)), ), ( - (file_path::hidden::NAME, json!(is_hidden)), + (file_path::hidden::NAME, msgpack!(is_hidden)), file_path::hidden::set(Some(is_hidden)), ), ] diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index ce0b29dd9..e856cc3c7 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -18,7 +18,7 @@ use sd_sync::*; use sd_utils::{ db::{maybe_missing, MissingFieldError}, error::{FileIOError, NonUtf8PathError}, - uuid_to_bytes, + msgpack, uuid_to_bytes, }; use sd_file_path_helper::IsolatedFilePathDataParts; @@ -296,31 +296,31 @@ impl LocationUpdateArgs { .filter(|name| location.name.as_ref() != Some(name)) .map(|v| { ( - (location::name::NAME, json!(v)), + (location::name::NAME, msgpack!(v)), location::name::set(Some(v)), ) }), self.generate_preview_media.map(|v| { ( - (location::generate_preview_media::NAME, json!(v)), + (location::generate_preview_media::NAME, msgpack!(v)), location::generate_preview_media::set(Some(v)), ) }), self.sync_preview_media.map(|v| { ( - (location::sync_preview_media::NAME, json!(v)), + (location::sync_preview_media::NAME, msgpack!(v)), location::sync_preview_media::set(Some(v)), ) }), self.hidden.map(|v| { ( - (location::hidden::NAME, json!(v)), + (location::hidden::NAME, msgpack!(v)), location::hidden::set(Some(v)), ) }), self.path.clone().map(|v| { ( - (location::path::NAME, json!(v)), + (location::path::NAME, msgpack!(v)), location::path::set(Some(v)), ) }), @@ -567,7 +567,7 @@ pub async fn relink_location( pub_id: pub_id.clone(), }, location::path::NAME, - json!(path), + msgpack!(path), ), db.location().update( location::pub_id::equals(pub_id.clone()), @@ -685,12 +685,12 @@ async fn create_location( pub_id: location_pub_id.as_bytes().to_vec(), }, [ - (location::name::NAME, json!(&name)), - (location::path::NAME, json!(&path)), - (location::date_created::NAME, json!(date_created)), + (location::name::NAME, msgpack!(&name)), + (location::path::NAME, msgpack!(&path)), + (location::date_created::NAME, msgpack!(date_created)), ( location::instance::NAME, - json!(prisma_sync::instance::SyncId { + msgpack!(prisma_sync::instance::SyncId { pub_id: uuid_to_bytes(sync.instance) }), ), @@ -1072,48 +1072,48 @@ pub async fn create_file_path( ( ( location::NAME, - json!(prisma_sync::location::SyncId { + msgpack!(prisma_sync::location::SyncId { pub_id: location.pub_id }), ), location::connect(prisma::location::id::equals(location.id)), ), - ((cas_id::NAME, json!(cas_id)), cas_id::set(cas_id)), + ((cas_id::NAME, msgpack!(cas_id)), cas_id::set(cas_id)), ( - (materialized_path::NAME, json!(materialized_path)), + (materialized_path::NAME, msgpack!(materialized_path)), materialized_path::set(Some(materialized_path.into())), ), - ((name::NAME, json!(name)), name::set(Some(name.into()))), + ((name::NAME, msgpack!(name)), name::set(Some(name.into()))), ( - (extension::NAME, json!(extension)), + (extension::NAME, msgpack!(extension)), extension::set(Some(extension.into())), ), ( ( size_in_bytes_bytes::NAME, - json!(metadata.size_in_bytes.to_be_bytes().to_vec()), + msgpack!(metadata.size_in_bytes.to_be_bytes().to_vec()), ), size_in_bytes_bytes::set(Some(metadata.size_in_bytes.to_be_bytes().to_vec())), ), ( - (inode::NAME, json!(metadata.inode.to_le_bytes())), + (inode::NAME, msgpack!(metadata.inode.to_le_bytes())), inode::set(Some(inode_to_db(metadata.inode))), ), - ((is_dir::NAME, json!(is_dir)), is_dir::set(Some(is_dir))), + ((is_dir::NAME, msgpack!(is_dir)), is_dir::set(Some(is_dir))), ( - (date_created::NAME, json!(metadata.created_at)), + (date_created::NAME, msgpack!(metadata.created_at)), date_created::set(Some(metadata.created_at.into())), ), ( - (date_modified::NAME, json!(metadata.modified_at)), + (date_modified::NAME, msgpack!(metadata.modified_at)), date_modified::set(Some(metadata.modified_at.into())), ), ( - (date_indexed::NAME, json!(indexed_at)), + (date_indexed::NAME, msgpack!(indexed_at)), date_indexed::set(Some(indexed_at.into())), ), ( - (hidden::NAME, json!(metadata.hidden)), + (hidden::NAME, msgpack!(metadata.hidden)), hidden::set(Some(metadata.hidden)), ), ] diff --git a/core/src/object/media/mod.rs b/core/src/object/media/mod.rs index 566e8d3e2..4eee9c3ca 100644 --- a/core/src/object/media/mod.rs +++ b/core/src/object/media/mod.rs @@ -30,7 +30,7 @@ pub fn media_data_image_to_query( pub fn media_data_image_to_query_params( mdi: ImageMetadata, -) -> (Vec<(&'static str, serde_json::Value)>, Vec) { +) -> (Vec<(&'static str, rmpv::Value)>, Vec) { use sd_sync::option_sync_db_entry; use sd_utils::chain_optional_iter; diff --git a/core/src/object/old_file_identifier/mod.rs b/core/src/object/old_file_identifier/mod.rs index 0e0ad3968..e8a0c988f 100644 --- a/core/src/object/old_file_identifier/mod.rs +++ b/core/src/object/old_file_identifier/mod.rs @@ -11,7 +11,7 @@ use sd_prisma::{ prisma_sync, }; use sd_sync::{CRDTOperation, OperationFactory}; -use sd_utils::{db::maybe_missing, error::FileIOError, uuid_to_bytes}; +use sd_utils::{db::maybe_missing, error::FileIOError, msgpack, uuid_to_bytes}; use std::{ collections::{HashMap, HashSet}, @@ -20,7 +20,6 @@ use std::{ }; use futures::future::join_all; -use serde_json::json; use tokio::fs; use tracing::{error, trace}; use uuid::Uuid; @@ -165,7 +164,7 @@ async fn identifier_job_step( pub_id: sd_utils::uuid_to_bytes(*pub_id), }, file_path::cas_id::NAME, - json!(&metadata.cas_id), + msgpack!(&metadata.cas_id), ), db.file_path().update( file_path::pub_id::equals(sd_utils::uuid_to_bytes(*pub_id)), @@ -279,11 +278,11 @@ async fn identifier_job_step( let (sync_params, db_params): (Vec<_>, Vec<_>) = [ ( - (object::date_created::NAME, json!(date_created)), + (object::date_created::NAME, msgpack!(date_created)), object::date_created::set(*date_created), ), ( - (object::kind::NAME, json!(kind)), + (object::kind::NAME, msgpack!(kind)), object::kind::set(Some(kind)), ), ] @@ -366,7 +365,7 @@ fn connect_file_path_to_object<'db>( pub_id: sd_utils::uuid_to_bytes(file_path_id), }, file_path::object::NAME, - json!(prisma_sync::object::SyncId { + msgpack!(prisma_sync::object::SyncId { pub_id: vec_id.clone() }), ), diff --git a/core/src/object/tag/mod.rs b/core/src/object/tag/mod.rs index 9e0aa485f..240c5c4f2 100644 --- a/core/src/object/tag/mod.rs +++ b/core/src/object/tag/mod.rs @@ -5,8 +5,8 @@ use sd_sync::*; use chrono::{DateTime, FixedOffset, Utc}; +use sd_utils::msgpack; use serde::Deserialize; -use serde_json::json; use specta::Type; use uuid::Uuid; @@ -34,10 +34,13 @@ impl TagCreateArgs { pub_id: pub_id.clone(), }, [ - (tag::name::NAME, json!(&self.name)), - (tag::color::NAME, json!(&self.color)), - (tag::is_hidden::NAME, json!(false)), - (tag::date_created::NAME, json!(&date_created.to_rfc3339())), + (tag::name::NAME, msgpack!(&self.name)), + (tag::color::NAME, msgpack!(&self.color)), + (tag::is_hidden::NAME, msgpack!(false)), + ( + tag::date_created::NAME, + msgpack!(&date_created.to_rfc3339()), + ), ], ), db.tag().create( diff --git a/core/src/object/validation/old_validator_job.rs b/core/src/object/validation/old_validator_job.rs index 717dc475f..f8e2fbff2 100644 --- a/core/src/object/validation/old_validator_job.rs +++ b/core/src/object/validation/old_validator_job.rs @@ -14,7 +14,7 @@ use sd_prisma::{ prisma_sync, }; use sd_sync::OperationFactory; -use sd_utils::{db::maybe_missing, error::FileIOError}; +use sd_utils::{db::maybe_missing, error::FileIOError, msgpack}; use std::{ hash::{Hash, Hasher}, @@ -162,7 +162,7 @@ impl StatefulJob for OldObjectValidatorJobInit { pub_id: file_path.pub_id.clone(), }, file_path::integrity_checksum::NAME, - json!(&checksum), + msgpack!(&checksum), ), db.file_path().update( file_path::pub_id::equals(file_path.pub_id.clone()), diff --git a/core/src/p2p/sync/mod.rs b/core/src/p2p/sync/mod.rs index 54187e8e5..9d8791589 100644 --- a/core/src/p2p/sync/mod.rs +++ b/core/src/p2p/sync/mod.rs @@ -69,7 +69,7 @@ mod originator { instance: Uuid::new_v4(), timestamp: sync::NTP64(0), id: Uuid::new_v4(), - record_id: serde_json::Value::Null, + record_id: rmpv::Value::Nil, model: "name".to_string(), data: sd_sync::CRDTOperationData::Create, }]); diff --git a/crates/ai/Cargo.toml b/crates/ai/Cargo.toml index dfe97c9f5..ca16579c8 100644 --- a/crates/ai/Cargo.toml +++ b/crates/ai/Cargo.toml @@ -37,6 +37,7 @@ uuid = { workspace = true, features = ["v4", "serde"] } # Note: Keep same version as used by ort ndarray = { version = "0.15.6" } half = { version = "2.1", features = ['num-traits'] } +rmpv.workspace = true # Microsoft does not provide a release for osx-gpu. See: https://github.com/microsoft/onnxruntime/releases # "gpu" means CUDA or TensorRT EP. Thus, the ort crate cannot download them at build time. diff --git a/crates/ai/src/old_image_labeler/process.rs b/crates/ai/src/old_image_labeler/process.rs index a41a10ca7..92eda51ed 100644 --- a/crates/ai/src/old_image_labeler/process.rs +++ b/crates/ai/src/old_image_labeler/process.rs @@ -4,7 +4,7 @@ use sd_prisma::{ prisma_sync, }; use sd_sync::OperationFactory; -use sd_utils::{db::MissingFieldError, error::FileIOError}; +use sd_utils::{db::MissingFieldError, error::FileIOError, msgpack}; use std::{ collections::{BTreeMap, HashMap, HashSet, VecDeque}, @@ -16,7 +16,6 @@ use async_channel as chan; use chrono::{DateTime, FixedOffset, Utc}; use futures_concurrency::future::{Join, Race}; use image::ImageFormat; -use serde_json::json; use tokio::{ fs, spawn, sync::{oneshot, OwnedRwLockReadGuard, OwnedSemaphorePermit, RwLock, Semaphore}, @@ -432,7 +431,7 @@ pub async fn assign_labels( .map(|name| { sync_params.extend(sync.shared_create( prisma_sync::label::SyncId { name: name.clone() }, - [(label::date_created::NAME, json!(&date_created))], + [(label::date_created::NAME, msgpack!(&date_created))], )); db.label() diff --git a/crates/cloud-api/Cargo.toml b/crates/cloud-api/Cargo.toml index 9daa89966..c27076ca8 100644 --- a/crates/cloud-api/Cargo.toml +++ b/crates/cloud-api/Cargo.toml @@ -15,3 +15,4 @@ uuid.workspace = true rspc = { workspace = true } specta.workspace = true base64.workspace = true +rmpv.workspace = true diff --git a/crates/cloud-api/src/lib.rs b/crates/cloud-api/src/lib.rs index f7c8861b7..a599a7f0c 100644 --- a/crates/cloud-api/src/lib.rs +++ b/crates/cloud-api/src/lib.rs @@ -450,7 +450,7 @@ pub mod library { pub key: String, pub start_time: String, pub end_time: String, - pub contents: serde_json::Value, + pub contents: String, pub ops_count: usize, } diff --git a/crates/prisma/Cargo.toml b/crates/prisma/Cargo.toml index d10047c86..b458e268e 100644 --- a/crates/prisma/Cargo.toml +++ b/crates/prisma/Cargo.toml @@ -10,3 +10,5 @@ sd-sync = { path = "../sync" } prisma-client-rust = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +rmp-serde = "1.1.2" +rmpv.workspace = true diff --git a/crates/sync-generator/src/lib.rs b/crates/sync-generator/src/lib.rs index 5dbee5571..ecbc96492 100644 --- a/crates/sync-generator/src/lib.rs +++ b/crates/sync-generator/src/lib.rs @@ -79,7 +79,6 @@ impl<'a> ModelSyncType<'a> { group: get_field("group"), } } - // "owned" => Self::Owned { id }, _ => return None, }) diff --git a/crates/sync-generator/src/model.rs b/crates/sync-generator/src/model.rs index 58e8616f0..d94c7fd38 100644 --- a/crates/sync-generator/src/model.rs +++ b/crates/sync-generator/src/model.rs @@ -71,41 +71,47 @@ pub fn module((model, sync_type): ModelWithSyncType) -> Module { let set_param_impl = { let field_matches = model.fields().filter_map(|field| { - let field_name_snake = snake_ident(field.name()); + let field_name_snake = snake_ident(field.name()); - match field.refine() { - RefinedFieldWalker::Scalar(scalar_field) => { - (!scalar_field.is_in_required_relation()).then(|| quote! { - #model_name_snake::#field_name_snake::set(::serde_json::from_value(val).unwrap()), - }) - }, - RefinedFieldWalker::Relation(relation_field) => { - let relation_model_name_snake = snake_ident(relation_field.related_model().name()); - - match relation_field.referenced_fields() { - Some(i) => { - if i.count() == 1 { - Some(quote! {{ - let val: std::collections::HashMap = ::serde_json::from_value(val).unwrap(); - let val = val.into_iter().next().unwrap(); - - #model_name_snake::#field_name_snake::connect( - #relation_model_name_snake::UniqueWhereParam::deserialize(&val.0, val.1).unwrap() - ) - }}) - } else { None } - }, - _ => None + match field.refine() { + RefinedFieldWalker::Scalar(scalar_field) => { + (!scalar_field.is_in_required_relation()).then(|| { + quote! { + #model_name_snake::#field_name_snake::set(::rmpv::ext::from_value(val).unwrap()), } - }, - }.map(|body| quote!(#model_name_snake::#field_name_snake::NAME => #body)) - }); + }) + } + RefinedFieldWalker::Relation(relation_field) => { + let relation_model_name_snake = + snake_ident(relation_field.related_model().name()); + + match relation_field.referenced_fields() { + Some(i) => { + if i.count() == 1 { + Some(quote! {{ + let val: std::collections::HashMap = ::rmpv::ext::from_value(val).unwrap(); + let val = val.into_iter().next().unwrap(); + + #model_name_snake::#field_name_snake::connect( + #relation_model_name_snake::UniqueWhereParam::deserialize(&val.0, val.1).unwrap() + ) + }}) + } else { + None + } + } + _ => None, + } + } + } + .map(|body| quote!(#model_name_snake::#field_name_snake::NAME => #body)) + }); match field_matches.clone().count() { 0 => quote!(), _ => quote! { impl #model_name_snake::SetParam { - pub fn deserialize(field: &str, val: ::serde_json::Value) -> Option { + pub fn deserialize(field: &str, val: ::rmpv::Value) -> Option { Some(match field { #(#field_matches)* _ => return None @@ -125,7 +131,7 @@ pub fn module((model, sync_type): ModelWithSyncType) -> Module { Some(quote!(#model_name_snake::#field_name_snake::NAME => #model_name_snake::#field_name_snake::equals( - ::serde_json::from_value(val).unwrap() + ::rmpv::ext::from_value(val).unwrap() ), )) } @@ -137,7 +143,7 @@ pub fn module((model, sync_type): ModelWithSyncType) -> Module { 0 => quote!(), _ => quote! { impl #model_name_snake::UniqueWhereParam { - pub fn deserialize(field: &str, val: ::serde_json::Value) -> Option { + pub fn deserialize(field: &str, val: ::rmpv::Value) -> Option { Some(match field { #(#field_matches)* _ => return None diff --git a/crates/sync-generator/src/sync_data.rs b/crates/sync-generator/src/sync_data.rs index 5b398f9a4..5f63d8e43 100644 --- a/crates/sync-generator/src/sync_data.rs +++ b/crates/sync-generator/src/sync_data.rs @@ -25,7 +25,7 @@ pub fn r#enum(models: Vec) -> TokenStream { quote!(#model_name_pascal(#model_name_snake::SyncId, sd_sync::CRDTOperationData)), quote! { prisma::#model_name_snake::NAME => - Self::#model_name_pascal(serde_json::from_value(op.record_id).ok()?, op.data) + Self::#model_name_pascal(rmpv::ext::from_value(op.record_id).ok()?, op.data) }, ) }) diff --git a/crates/sync/Cargo.toml b/crates/sync/Cargo.toml index 416a49f20..f734df205 100644 --- a/crates/sync/Cargo.toml +++ b/crates/sync/Cargo.toml @@ -7,6 +7,9 @@ edition = { workspace = true } [dependencies] prisma-client-rust = { workspace = true } +rmp = "0.8.12" +rmp-serde = "1.1.2" +rmpv = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } specta = { workspace = true, features = ["uuid", "uhlc"] } diff --git a/crates/sync/src/crdt.rs b/crates/sync/src/crdt.rs index 97822e691..a8def9015 100644 --- a/crates/sync/src/crdt.rs +++ b/crates/sync/src/crdt.rs @@ -1,7 +1,6 @@ use std::fmt::Debug; use serde::{Deserialize, Serialize}; -use serde_json::Value; use specta::Type; use uhlc::NTP64; use uuid::Uuid; @@ -22,12 +21,16 @@ impl std::fmt::Display for OperationKind<'_> { } } -#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Debug, Type)] +#[derive(PartialEq, Serialize, Deserialize, Clone, Debug, Type)] pub enum CRDTOperationData { #[serde(rename = "c")] Create, #[serde(rename = "u")] - Update { field: String, value: Value }, + Update { + field: String, + #[specta(type = serde_json::Value)] + value: rmpv::Value, + }, #[serde(rename = "d")] Delete, } @@ -42,14 +45,15 @@ impl CRDTOperationData { } } -#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Type)] +#[derive(PartialEq, Serialize, Deserialize, Clone, Type)] pub struct CRDTOperation { pub instance: Uuid, #[specta(type = u32)] pub timestamp: NTP64, pub id: Uuid, pub model: String, - pub record_id: Value, + #[specta(type = serde_json::Value)] + pub record_id: rmpv::Value, pub data: CRDTOperationData, } diff --git a/crates/sync/src/factory.rs b/crates/sync/src/factory.rs index 959509313..5ddfc4bc1 100644 --- a/crates/sync/src/factory.rs +++ b/crates/sync/src/factory.rs @@ -1,5 +1,4 @@ use prisma_client_rust::ModelTypes; -use serde_json::{json, Value}; use uhlc::HLC; use uuid::Uuid; @@ -7,6 +6,12 @@ use crate::{ CRDTOperation, CRDTOperationData, RelationSyncId, RelationSyncModel, SharedSyncModel, SyncId, }; +macro_rules! msgpack { + ($e:expr) => { + ::rmpv::ext::to_value($e).expect("failed to serialize msgpack") + } +} + pub trait OperationFactory { fn get_clock(&self) -> &HLC; fn get_instance(&self) -> Uuid; @@ -23,7 +28,7 @@ pub trait OperationFactory { timestamp: *timestamp.get_time(), id: Uuid::new_v4(), model: TModel::MODEL.to_string(), - record_id: json!(id), + record_id: msgpack!(id), data, } } @@ -31,7 +36,7 @@ pub trait OperationFactory { fn shared_create, TModel: SharedSyncModel>( &self, id: TSyncId, - values: impl IntoIterator + 'static, + values: impl IntoIterator + 'static, ) -> Vec { [self.new_op(&id, CRDTOperationData::Create)] .into_iter() @@ -50,7 +55,7 @@ pub trait OperationFactory { &self, id: TSyncId, field: impl Into, - value: Value, + value: rmpv::Value, ) -> CRDTOperation { self.new_op( &id, @@ -70,7 +75,7 @@ pub trait OperationFactory { fn relation_create, TModel: RelationSyncModel>( &self, id: TSyncId, - values: impl IntoIterator + 'static, + values: impl IntoIterator + 'static, ) -> Vec { [self.new_op(&id, CRDTOperationData::Create)] .into_iter() @@ -89,7 +94,7 @@ pub trait OperationFactory { &self, id: TSyncId, field: impl Into, - value: Value, + value: rmpv::Value, ) -> CRDTOperation { self.new_op( &id, @@ -111,7 +116,7 @@ pub trait OperationFactory { macro_rules! sync_entry { ($v:expr, $($m:tt)*) => {{ let v = $v; - ($($m)*::NAME, serde_json::json!(&v)) + ($($m)*::NAME, ::rmpv::ext::to_value(&v).expect("failed to serialize msgpack")) }} } diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index ac6476088..6974b44ad 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -26,3 +26,13 @@ pub fn uuid_to_bytes(uuid: Uuid) -> Vec { pub fn from_bytes_to_uuid(bytes: &[u8]) -> Uuid { Uuid::from_slice(bytes).expect("corrupted uuid in database") } + +#[macro_export] +macro_rules! msgpack { + (null) => { + ::rmpv::Value::Nil + }; + ($e:expr) => { + ::rmpv::ext::to_value(&$e).expect("failed to serialize msgpack") + } +} diff --git a/interface/app/$libraryId/Explorer/View/GridView/Item/index.tsx b/interface/app/$libraryId/Explorer/View/GridView/Item/index.tsx index 0018bf968..2151d5dc7 100644 --- a/interface/app/$libraryId/Explorer/View/GridView/Item/index.tsx +++ b/interface/app/$libraryId/Explorer/View/GridView/Item/index.tsx @@ -162,9 +162,7 @@ function LabelItemCount({ data }: { data: Extract