switch sync to msgpack instead of json (#2164)

* switch sync to msgpack instead of json

* use rmpv

* more rmpv

* use msgpack in cloud

* receive operations as msgpack
This commit is contained in:
Brendan Allan 2024-03-06 18:15:29 +08:00 committed by GitHub
parent 04c5a7fef0
commit 63b17adb10
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
32 changed files with 369 additions and 266 deletions

21
Cargo.lock generated
View file

@ -7353,6 +7353,8 @@ checksum = "2e0e0214a4a2b444ecce41a4025792fc31f77c7bb89c46d253953ea8c65701ec"
dependencies = [
"num-traits",
"rmp",
"serde",
"serde_bytes",
]
[[package]]
@ -7684,6 +7686,7 @@ dependencies = [
"prisma-client-rust",
"reqwest",
"rmp-serde",
"rmpv",
"sd-core-sync",
"sd-file-path-helper",
"sd-prisma",
@ -7726,6 +7729,7 @@ version = "0.1.0"
dependencies = [
"base64 0.21.7",
"reqwest",
"rmpv",
"rspc",
"sd-p2p2",
"serde",
@ -7779,6 +7783,7 @@ dependencies = [
"prisma-client-rust",
"regex",
"reqwest",
"rmp",
"rmp-serde",
"rmpv",
"rspc",
@ -7831,6 +7836,8 @@ name = "sd-core-sync"
version = "0.0.0"
dependencies = [
"prisma-client-rust",
"rmp-serde",
"rmpv",
"sd-prisma",
"sd-sync",
"sd-utils",
@ -8119,6 +8126,8 @@ name = "sd-prisma"
version = "0.1.0"
dependencies = [
"prisma-client-rust",
"rmp-serde",
"rmpv",
"sd-cache",
"sd-sync",
"serde",
@ -8144,6 +8153,9 @@ name = "sd-sync"
version = "0.1.0"
dependencies = [
"prisma-client-rust",
"rmp",
"rmp-serde",
"rmpv",
"serde",
"serde_json",
"specta",
@ -8318,6 +8330,15 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_bytes"
version = "0.11.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab33ec92f677585af6d88c65593ae2375adde54efdbf16d597f2cbc7a6d368ff"
dependencies = [
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.196"

View file

@ -1,18 +1,18 @@
[workspace]
resolver = "2"
members = [
"core",
"core/crates/*",
"crates/*",
# "crates/p2p/tunnel",
# "crates/p2p/tunnel/utils",
"apps/cli",
"apps/desktop/src-tauri",
"apps/desktop/crates/*",
"apps/mobile/modules/sd-core/core",
"apps/mobile/modules/sd-core/android/crate",
"apps/mobile/modules/sd-core/ios/crate",
"apps/server",
"core",
"core/crates/*",
"crates/*",
# "crates/p2p/tunnel",
# "crates/p2p/tunnel/utils",
"apps/cli",
"apps/desktop/src-tauri",
"apps/desktop/crates/*",
"apps/mobile/modules/sd-core/core",
"apps/mobile/modules/sd-core/android/crate",
"apps/mobile/modules/sd-core/ios/crate",
"apps/server",
]
[workspace.package]
@ -23,19 +23,19 @@ repository = "https://github.com/spacedriveapp/spacedrive"
[workspace.dependencies]
# First party dependencies
prisma-client-rust = { git = "https://github.com/spacedriveapp/prisma-client-rust", rev = "f99d6f5566570f3ab1edecb7a172ad25b03d95af", features = [
"rspc",
"sqlite-create-many",
"migrations",
"sqlite",
"rspc",
"sqlite-create-many",
"migrations",
"sqlite",
], default-features = false }
prisma-client-rust-cli = { git = "https://github.com/spacedriveapp/prisma-client-rust", rev = "f99d6f5566570f3ab1edecb7a172ad25b03d95af", features = [
"rspc",
"sqlite-create-many",
"migrations",
"sqlite",
"rspc",
"sqlite-create-many",
"migrations",
"sqlite",
], default-features = false }
prisma-client-rust-sdk = { git = "https://github.com/spacedriveapp/prisma-client-rust", rev = "f99d6f5566570f3ab1edecb7a172ad25b03d95af", features = [
"sqlite",
"sqlite",
], default-features = false }
tracing = "0.1.40"
@ -71,6 +71,7 @@ rand_chacha = "0.3.1"
regex = "1.10.2"
reqwest = "0.11.22"
rmp-serde = "1.1.2"
rmpv = { version = "^1.0.1", features = ["with-serde"] }
serde = "1.0"
serde_json = "1.0"
strum = "0.25"

View file

@ -23,18 +23,18 @@ sd-cache = { path = "../crates/cache" }
sd-core-sync = { path = "./crates/sync" }
# sd-cloud-api = { path = "../crates/cloud-api" }
sd-crypto = { path = "../crates/crypto", features = [
"rspc",
"specta",
"serde",
"keymanager",
"rspc",
"specta",
"serde",
"keymanager",
] }
sd-file-path-helper = { path = "../crates/file-path-helper" }
sd-ffmpeg = { path = "../crates/ffmpeg", optional = true }
sd-file-ext = { path = "../crates/file-ext" }
sd-images = { path = "../crates/images", features = [
"rspc",
"serde",
"specta",
"rspc",
"serde",
"specta",
] }
sd-media-metadata = { path = "../crates/media-metadata" }
sd-p2p2 = { path = "../crates/p2p2", features = ["specta"] }
@ -65,12 +65,12 @@ regex = { workspace = true }
reqwest = { workspace = true, features = ["json", "native-tls-vendored"] }
rmp-serde = { workspace = true }
rspc = { workspace = true, features = [
"axum",
"uuid",
"chrono",
"tracing",
"alpha",
"unstable",
"axum",
"uuid",
"chrono",
"tracing",
"alpha",
"unstable",
] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
@ -80,12 +80,12 @@ strum_macros = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = [
"sync",
"rt-multi-thread",
"io-util",
"macros",
"time",
"process",
"sync",
"rt-multi-thread",
"io-util",
"macros",
"time",
"process",
] }
tokio-stream = { workspace = true, features = ["fs"] }
tokio-util = { workspace = true, features = ["io"] }
@ -111,10 +111,10 @@ int-enum = "0.5.0"
itertools = "0.12.0"
libc = "0.2.153"
mini-moka = "0.10.2"
notify = { git="https://github.com/notify-rs/notify.git", rev="c3929ed114fbb0bc7457a9a498260461596b00ca", default-features = false, features = [
"macos_fsevent",
notify = { git = "https://github.com/notify-rs/notify.git", rev = "c3929ed114fbb0bc7457a9a498260461596b00ca", default-features = false, features = [
"macos_fsevent",
] }
rmpv = "^1.0.1"
rmpv = { workspace = true }
serde-hashkey = "0.4.5"
serde_repr = "0.1"
serde_with = "3.4.0"
@ -129,6 +129,7 @@ base91 = "0.1.0"
sd-actors = { version = "0.1.0", path = "../crates/actors" }
tower-service = "0.3.2"
hyper = { version = "=0.14.28", features = ["http1", "server", "client"] }
rmp = "0.8.12"
# Override features of transitive dependencies
[dependencies.openssl]

View file

@ -18,3 +18,5 @@ tokio = { workspace = true }
uuid = { workspace = true }
tracing = { workspace = true }
uhlc = { workspace = true }
rmp-serde = "1.1.2"
rmpv = { workspace = true }

View file

@ -9,10 +9,15 @@ use sd_prisma::{
};
use sd_sync::{option_sync_entry, OperationFactory};
use sd_utils::chain_optional_iter;
use serde_json::json;
use crate::crdt_op_unchecked_db;
macro_rules! msgpack {
($e:expr) => {
::rmpv::ext::to_value($e).expect("failed to serialize msgpack")
}
}
pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, instance_id: i32) {
db._transaction()
.with_timeout(9999999999)
@ -254,12 +259,14 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
chain_optional_iter(
[],
[
t.name.map(|v| (tag::name::NAME, json!(v))),
t.color.map(|v| (tag::color::NAME, json!(v))),
t.date_created
.map(|v| (tag::date_created::NAME, json!(v))),
t.date_modified
.map(|v| (tag::date_modified::NAME, json!(v))),
t.name.map(|v| (tag::name::NAME, msgpack!(v))),
t.color.map(|v| (tag::color::NAME, msgpack!(v))),
t.date_created.map(|v| {
(tag::date_created::NAME, msgpack!(v))
}),
t.date_modified.map(|v| {
(tag::date_modified::NAME, msgpack!(v))
}),
],
),
)
@ -272,43 +279,53 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
)
.await?;
let tag_on_objects = db
.tag_on_object()
.find_many(vec![])
.include(tag_on_object::include!({
tag: select { pub_id }
object: select { pub_id }
}))
.exec()
.await?;
db.crdt_operation()
.create_many(
tag_on_objects
.into_iter()
.flat_map(|t_o| {
sync.relation_create(
prisma_sync::tag_on_object::SyncId {
tag: prisma_sync::tag::SyncId {
pub_id: t_o.tag.pub_id,
},
object: prisma_sync::object::SyncId {
pub_id: t_o.object.pub_id,
},
},
chain_optional_iter(
[],
[option_sync_entry!(
t_o.date_created,
tag_on_object::date_created
)],
),
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
.await?;
paginate_relation(
|group_id, item_id| {
db.tag_on_object()
.find_many(vec![
tag_on_object::tag_id::gt(group_id),
tag_on_object::object_id::gt(item_id),
])
.order_by(tag_on_object::tag_id::order(SortOrder::Asc))
.order_by(tag_on_object::object_id::order(SortOrder::Asc))
.include(tag_on_object::include!({
tag: select { pub_id }
object: select { pub_id }
}))
.exec()
},
|t_o| (t_o.tag_id, t_o.object_id),
|tag_on_objects| {
db.crdt_operation()
.create_many(
tag_on_objects
.into_iter()
.flat_map(|t_o| {
sync.relation_create(
prisma_sync::tag_on_object::SyncId {
tag: prisma_sync::tag::SyncId {
pub_id: t_o.tag.pub_id,
},
object: prisma_sync::object::SyncId {
pub_id: t_o.object.pub_id,
},
},
chain_optional_iter(
[],
[option_sync_entry!(
t_o.date_created,
tag_on_object::date_created
)],
),
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
},
)
.await?;
paginate(
|cursor| {
@ -327,8 +344,8 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
sync.shared_create(
prisma_sync::label::SyncId { name: l.name },
[
(label::date_created::NAME, json!(l.date_created)),
(label::date_modified::NAME, json!(l.date_modified)),
(label::date_created::NAME, msgpack!(l.date_created)),
(label::date_modified::NAME, msgpack!(l.date_modified)),
],
)
})
@ -340,39 +357,50 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
)
.await?;
let label_on_objects = db
.label_on_object()
.find_many(vec![])
.select(label_on_object::select!({
object: select { pub_id }
label: select { name }
}))
.exec()
.await?;
let res = db
.crdt_operation()
.create_many(
label_on_objects
.into_iter()
.flat_map(|l_o| {
sync.relation_create(
prisma_sync::label_on_object::SyncId {
label: prisma_sync::label::SyncId {
name: l_o.label.name,
},
object: prisma_sync::object::SyncId {
pub_id: l_o.object.pub_id,
},
},
[],
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
.await;
let res = paginate_relation(
|group_id, item_id| {
db.label_on_object()
.find_many(vec![
label_on_object::label_id::gt(group_id),
label_on_object::object_id::gt(item_id),
])
.order_by(label_on_object::label_id::order(SortOrder::Asc))
.order_by(label_on_object::object_id::order(SortOrder::Asc))
.include(label_on_object::include!({
object: select { pub_id }
label: select { name }
}))
.exec()
},
|l_o| (l_o.label_id, l_o.object_id),
|label_on_objects| {
db.crdt_operation()
.create_many(
label_on_objects
.into_iter()
.flat_map(|l_o| {
sync.relation_create(
prisma_sync::label_on_object::SyncId {
label: prisma_sync::label::SyncId {
name: l_o.label.name,
},
object: prisma_sync::object::SyncId {
pub_id: l_o.object.pub_id,
},
},
[],
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
},
)
.await;
println!("backfill ended");
res
})
.await
@ -389,7 +417,7 @@ async fn paginate<
id: impl Fn(&T) -> i32,
operations: impl Fn(Vec<T>) -> TOperations,
) -> Result<(), E> {
let mut next_cursor = Some(0);
let mut next_cursor = Some(-1);
loop {
let Some(cursor) = next_cursor else {
break;
@ -402,3 +430,27 @@ async fn paginate<
Ok(())
}
async fn paginate_relation<
T,
E: std::fmt::Debug,
TGetter: Future<Output = Result<Vec<T>, E>>,
TOperations: Future<Output = Result<i64, E>>,
>(
getter: impl Fn(i32, i32) -> TGetter,
id: impl Fn(&T) -> (i32, i32),
operations: impl Fn(Vec<T>) -> TOperations,
) -> Result<(), E> {
let mut next_cursor = Some((-1, -1));
loop {
let Some(cursor) = next_cursor else {
break;
};
let items = getter(cursor.0, cursor.1).await?;
next_cursor = items.last().map(&id);
operations(items).await?;
}
Ok(())
}

View file

@ -26,8 +26,7 @@ use sd_prisma::{
prisma_sync,
};
use sd_sync::OperationFactory;
use sd_utils::{db::maybe_missing, error::FileIOError};
use serde_json::json;
use sd_utils::{db::maybe_missing, error::FileIOError, msgpack};
use std::{
ffi::OsString,
@ -204,7 +203,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
pub_id: object.pub_id,
},
object::note::NAME,
json!(&args.note),
msgpack!(&args.note),
),
db.object().update(
object::id::equals(args.id),
@ -250,7 +249,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
pub_id: object.pub_id,
},
object::favorite::NAME,
json!(&args.favorite),
msgpack!(&args.favorite),
),
db.object().update(
object::id::equals(args.id),
@ -316,7 +315,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
sync.shared_update(
prisma_sync::object::SyncId { pub_id: d.pub_id },
object::date_accessed::NAME,
json!(date_accessed),
msgpack!(date_accessed),
),
d.id,
)
@ -359,7 +358,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
sync.shared_update(
prisma_sync::object::SyncId { pub_id: d.pub_id },
object::date_accessed::NAME,
json!(null),
msgpack!(null),
),
d.id,
)

View file

@ -6,7 +6,7 @@ use sd_prisma::{
prisma_sync,
};
use sd_sync::OperationFactory;
use sd_utils::uuid_to_bytes;
use sd_utils::{msgpack, uuid_to_bytes};
use std::collections::BTreeMap;
@ -14,7 +14,6 @@ use chrono::{DateTime, Utc};
use itertools::{Either, Itertools};
use rspc::{alpha::AlphaRouter, ErrorCode};
use serde::{Deserialize, Serialize};
use serde_json::json;
use specta::Type;
use uuid::Uuid;
@ -234,7 +233,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
pub_id: fp.pub_id.clone(),
},
file_path::object::NAME,
json!(id),
msgpack!(id),
));
(
@ -329,8 +328,8 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
db,
(
[
args.name.as_ref().map(|v| (tag::name::NAME, json!(v))),
args.color.as_ref().map(|v| (tag::color::NAME, json!(v))),
args.name.as_ref().map(|v| (tag::name::NAME, msgpack!(v))),
args.color.as_ref().map(|v| (tag::color::NAME, msgpack!(v))),
]
.into_iter()
.flatten()

View file

@ -1,6 +1,5 @@
use sd_sync::*;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::{atomic, Arc};
use tokio::sync::Notify;
use uuid::Uuid;
@ -80,7 +79,7 @@ macro_rules! err_break {
}
pub(crate) use err_break;
pub type CompressedCRDTOperationsForModel = Vec<(Value, Vec<CompressedCRDTOperation>)>;
pub type CompressedCRDTOperationsForModel = Vec<(rmpv::Value, Vec<CompressedCRDTOperation>)>;
#[derive(Serialize, Deserialize)]
pub struct CompressedCRDTOperations(Vec<(Uuid, Vec<(String, CompressedCRDTOperationsForModel)>)>);
@ -168,7 +167,7 @@ impl CompressedCRDTOperations {
}
}
#[derive(PartialEq, Eq, Serialize, Deserialize, Clone)]
#[derive(PartialEq, Serialize, Deserialize, Clone)]
pub struct CompressedCRDTOperation {
pub timestamp: NTP64,
pub id: Uuid,

View file

@ -155,10 +155,9 @@ pub async fn run_actor(
e.insert(NTP64(0));
}
let compressed_operations: CompressedCRDTOperations =
err_break!(serde_json::from_slice(err_break!(
&BASE64_STANDARD.decode(collection.contents)
)));
let compressed_operations: CompressedCRDTOperations = err_break!(
rmp_serde::from_slice(err_break!(&BASE64_STANDARD.decode(collection.contents)))
);
err_break!(write_cloud_ops_to_db(compressed_operations.into_ops(), &db).await);

View file

@ -68,13 +68,17 @@ pub async fn run_actor(
let ops_len = ops.len();
use base64::prelude::*;
instances.push(do_add::Input {
uuid: req_add.instance_uuid,
key: req_add.key,
start_time,
end_time,
contents: serde_json::to_value(CompressedCRDTOperations::new(ops))
.expect("CompressedCRDTOperation should serialize!"),
contents: BASE64_STANDARD.encode(
rmp_serde::to_vec_named(&CompressedCRDTOperations::new(ops))
.expect("CompressedCRDTOperation should serialize!"),
),
ops_count: ops_len,
})
}

View file

@ -8,7 +8,7 @@ use sd_prisma::{
prisma_sync,
};
use sd_sync::*;
use sd_utils::{db::inode_to_db, error::FileIOError, from_bytes_to_uuid};
use sd_utils::{db::inode_to_db, error::FileIOError, from_bytes_to_uuid, msgpack};
use std::{collections::HashMap, path::Path};
@ -18,7 +18,6 @@ use itertools::Itertools;
use prisma_client_rust::operator::or;
use rspc::ErrorCode;
use serde::{Deserialize, Serialize};
use serde_json::json;
use thiserror::Error;
use tracing::{trace, warn};
@ -110,7 +109,7 @@ async fn execute_indexer_save_step(
(
(
location::NAME,
json!(prisma_sync::location::SyncId {
msgpack!(prisma_sync::location::SyncId {
pub_id: location.pub_id.clone()
}),
),
@ -195,11 +194,9 @@ async fn execute_indexer_update_step(
let (sync_params, db_params): (Vec<_>, Vec<_>) = [
// As this file was updated while Spacedrive was offline, we mark the object_id and cas_id as null
// So this file_path will be updated at file identifier job
should_unlink_object.then_some((
(object_id::NAME, serde_json::Value::Null),
object::disconnect(),
)),
Some(((cas_id::NAME, serde_json::Value::Null), cas_id::set(None))),
should_unlink_object
.then_some(((object_id::NAME, msgpack!(null)), object::disconnect())),
Some(((cas_id::NAME, msgpack!(null)), cas_id::set(None))),
Some(sync_db_entry!(*is_dir, is_dir)),
Some(sync_db_entry!(
entry.metadata.size_in_bytes.to_be_bytes().to_vec(),
@ -531,7 +528,7 @@ pub async fn reverse_update_directories_sizes(
pub_id: pub_id.clone(),
},
file_path::size_in_bytes_bytes::NAME,
json!(size_bytes.clone()),
msgpack!(size_bytes.clone()),
),
db.file_path().update(
file_path::pub_id::equals(pub_id),

View file

@ -18,7 +18,7 @@ use sd_prisma::{
prisma_sync,
};
use sd_sync::*;
use sd_utils::{db::maybe_missing, from_bytes_to_uuid};
use sd_utils::{db::maybe_missing, from_bytes_to_uuid, msgpack};
use std::{
collections::HashMap,
@ -599,7 +599,7 @@ async fn update_directories_sizes(
pub_id: file_path.pub_id.clone(),
},
file_path::size_in_bytes_bytes::NAME,
json!(size_bytes.clone()),
msgpack!(size_bytes.clone()),
),
db.file_path().update(
file_path::pub_id::equals(file_path.pub_id),

View file

@ -33,7 +33,7 @@ use sd_sync::OperationFactory;
use sd_utils::{
db::{inode_from_db, inode_to_db, maybe_missing},
error::FileIOError,
uuid_to_bytes,
msgpack, uuid_to_bytes,
};
#[cfg(target_family = "unix")]
@ -53,7 +53,6 @@ use std::{
use chrono::{DateTime, FixedOffset, Local, Utc};
use notify::Event;
use serde_json::json;
use tokio::{
fs,
io::{self, ErrorKind},
@ -273,8 +272,8 @@ async fn inner_create_file(
pub_id: pub_id.clone(),
},
[
(object::date_created::NAME, json!(date_created)),
(object::kind::NAME, json!(int_kind)),
(object::date_created::NAME, msgpack!(date_created)),
(object::kind::NAME, msgpack!(int_kind)),
],
),
db.object()
@ -298,7 +297,7 @@ async fn inner_create_file(
pub_id: created_file.pub_id.clone(),
},
file_path::object::NAME,
json!(prisma_sync::object::SyncId {
msgpack!(prisma_sync::object::SyncId {
pub_id: object_pub_id.clone()
}),
),
@ -475,13 +474,13 @@ async fn inner_update_file(
[
(
(cas_id::NAME, json!(file_path.cas_id)),
(cas_id::NAME, msgpack!(file_path.cas_id)),
Some(cas_id::set(file_path.cas_id.clone())),
),
(
(
size_in_bytes_bytes::NAME,
json!(fs_metadata.len().to_be_bytes().to_vec()),
msgpack!(fs_metadata.len().to_be_bytes().to_vec()),
),
Some(size_in_bytes_bytes::set(Some(
fs_metadata.len().to_be_bytes().to_vec(),
@ -491,7 +490,7 @@ async fn inner_update_file(
let date = DateTime::<Utc>::from(fs_metadata.modified_or_now()).into();
(
(date_modified::NAME, json!(date)),
(date_modified::NAME, msgpack!(date)),
Some(date_modified::set(Some(date))),
)
},
@ -509,28 +508,28 @@ async fn inner_update_file(
};
(
(integrity_checksum::NAME, json!(checksum)),
(integrity_checksum::NAME, msgpack!(checksum)),
Some(integrity_checksum::set(checksum)),
)
},
{
if current_inode != inode {
(
(inode::NAME, json!(inode)),
(inode::NAME, msgpack!(inode)),
Some(inode::set(Some(inode_to_db(inode)))),
)
} else {
((inode::NAME, serde_json::Value::Null), None)
((inode::NAME, msgpack!(null)), None)
}
},
{
if is_hidden != file_path.hidden.unwrap_or_default() {
(
(hidden::NAME, json!(inode)),
(hidden::NAME, msgpack!(inode)),
Some(hidden::set(Some(is_hidden))),
)
} else {
((hidden::NAME, serde_json::Value::Null), None)
((hidden::NAME, msgpack!(null)), None)
}
},
]
@ -582,7 +581,7 @@ async fn inner_update_file(
pub_id: object.pub_id.clone(),
},
object::kind::NAME,
json!(int_kind),
msgpack!(int_kind),
),
db.object().update(
object::id::equals(object.id),
@ -604,8 +603,8 @@ async fn inner_update_file(
pub_id: pub_id.clone(),
},
[
(object::date_created::NAME, json!(date_created)),
(object::kind::NAME, json!(int_kind)),
(object::date_created::NAME, msgpack!(date_created)),
(object::kind::NAME, msgpack!(int_kind)),
],
),
db.object().create(
@ -626,7 +625,7 @@ async fn inner_update_file(
pub_id: file_path.pub_id.clone(),
},
file_path::object::NAME,
json!(prisma_sync::object::SyncId {
msgpack!(prisma_sync::object::SyncId {
pub_id: pub_id.clone()
}),
),
@ -731,7 +730,7 @@ async fn inner_update_file(
pub_id: file_path.pub_id.clone(),
},
file_path::hidden::NAME,
json!(is_hidden),
msgpack!(is_hidden),
)],
db.file_path().update(
file_path::pub_id::equals(file_path.pub_id.clone()),
@ -828,7 +827,7 @@ pub(super) async fn rename(
sync.shared_update(
sd_prisma::prisma_sync::file_path::SyncId { pub_id },
file_path::materialized_path::NAME,
json!(&new_path),
msgpack!(&new_path),
),
db.file_path().update(
file_path::id::equals(id),
@ -851,24 +850,24 @@ pub(super) async fn rename(
(
(
file_path::materialized_path::NAME,
json!(new_path_materialized_str),
msgpack!(new_path_materialized_str),
),
file_path::materialized_path::set(Some(new_path_materialized_str)),
),
(
(file_path::name::NAME, json!(new_parts.name)),
(file_path::name::NAME, msgpack!(new_parts.name)),
file_path::name::set(Some(new_parts.name.to_string())),
),
(
(file_path::extension::NAME, json!(new_parts.extension)),
(file_path::extension::NAME, msgpack!(new_parts.extension)),
file_path::extension::set(Some(new_parts.extension.to_string())),
),
(
(file_path::date_modified::NAME, json!(&date_modified)),
(file_path::date_modified::NAME, msgpack!(&date_modified)),
file_path::date_modified::set(Some(date_modified)),
),
(
(file_path::hidden::NAME, json!(is_hidden)),
(file_path::hidden::NAME, msgpack!(is_hidden)),
file_path::hidden::set(Some(is_hidden)),
),
]

View file

@ -18,7 +18,7 @@ use sd_sync::*;
use sd_utils::{
db::{maybe_missing, MissingFieldError},
error::{FileIOError, NonUtf8PathError},
uuid_to_bytes,
msgpack, uuid_to_bytes,
};
use sd_file_path_helper::IsolatedFilePathDataParts;
@ -296,31 +296,31 @@ impl LocationUpdateArgs {
.filter(|name| location.name.as_ref() != Some(name))
.map(|v| {
(
(location::name::NAME, json!(v)),
(location::name::NAME, msgpack!(v)),
location::name::set(Some(v)),
)
}),
self.generate_preview_media.map(|v| {
(
(location::generate_preview_media::NAME, json!(v)),
(location::generate_preview_media::NAME, msgpack!(v)),
location::generate_preview_media::set(Some(v)),
)
}),
self.sync_preview_media.map(|v| {
(
(location::sync_preview_media::NAME, json!(v)),
(location::sync_preview_media::NAME, msgpack!(v)),
location::sync_preview_media::set(Some(v)),
)
}),
self.hidden.map(|v| {
(
(location::hidden::NAME, json!(v)),
(location::hidden::NAME, msgpack!(v)),
location::hidden::set(Some(v)),
)
}),
self.path.clone().map(|v| {
(
(location::path::NAME, json!(v)),
(location::path::NAME, msgpack!(v)),
location::path::set(Some(v)),
)
}),
@ -567,7 +567,7 @@ pub async fn relink_location(
pub_id: pub_id.clone(),
},
location::path::NAME,
json!(path),
msgpack!(path),
),
db.location().update(
location::pub_id::equals(pub_id.clone()),
@ -685,12 +685,12 @@ async fn create_location(
pub_id: location_pub_id.as_bytes().to_vec(),
},
[
(location::name::NAME, json!(&name)),
(location::path::NAME, json!(&path)),
(location::date_created::NAME, json!(date_created)),
(location::name::NAME, msgpack!(&name)),
(location::path::NAME, msgpack!(&path)),
(location::date_created::NAME, msgpack!(date_created)),
(
location::instance::NAME,
json!(prisma_sync::instance::SyncId {
msgpack!(prisma_sync::instance::SyncId {
pub_id: uuid_to_bytes(sync.instance)
}),
),
@ -1072,48 +1072,48 @@ pub async fn create_file_path(
(
(
location::NAME,
json!(prisma_sync::location::SyncId {
msgpack!(prisma_sync::location::SyncId {
pub_id: location.pub_id
}),
),
location::connect(prisma::location::id::equals(location.id)),
),
((cas_id::NAME, json!(cas_id)), cas_id::set(cas_id)),
((cas_id::NAME, msgpack!(cas_id)), cas_id::set(cas_id)),
(
(materialized_path::NAME, json!(materialized_path)),
(materialized_path::NAME, msgpack!(materialized_path)),
materialized_path::set(Some(materialized_path.into())),
),
((name::NAME, json!(name)), name::set(Some(name.into()))),
((name::NAME, msgpack!(name)), name::set(Some(name.into()))),
(
(extension::NAME, json!(extension)),
(extension::NAME, msgpack!(extension)),
extension::set(Some(extension.into())),
),
(
(
size_in_bytes_bytes::NAME,
json!(metadata.size_in_bytes.to_be_bytes().to_vec()),
msgpack!(metadata.size_in_bytes.to_be_bytes().to_vec()),
),
size_in_bytes_bytes::set(Some(metadata.size_in_bytes.to_be_bytes().to_vec())),
),
(
(inode::NAME, json!(metadata.inode.to_le_bytes())),
(inode::NAME, msgpack!(metadata.inode.to_le_bytes())),
inode::set(Some(inode_to_db(metadata.inode))),
),
((is_dir::NAME, json!(is_dir)), is_dir::set(Some(is_dir))),
((is_dir::NAME, msgpack!(is_dir)), is_dir::set(Some(is_dir))),
(
(date_created::NAME, json!(metadata.created_at)),
(date_created::NAME, msgpack!(metadata.created_at)),
date_created::set(Some(metadata.created_at.into())),
),
(
(date_modified::NAME, json!(metadata.modified_at)),
(date_modified::NAME, msgpack!(metadata.modified_at)),
date_modified::set(Some(metadata.modified_at.into())),
),
(
(date_indexed::NAME, json!(indexed_at)),
(date_indexed::NAME, msgpack!(indexed_at)),
date_indexed::set(Some(indexed_at.into())),
),
(
(hidden::NAME, json!(metadata.hidden)),
(hidden::NAME, msgpack!(metadata.hidden)),
hidden::set(Some(metadata.hidden)),
),
]

View file

@ -30,7 +30,7 @@ pub fn media_data_image_to_query(
pub fn media_data_image_to_query_params(
mdi: ImageMetadata,
) -> (Vec<(&'static str, serde_json::Value)>, Vec<SetParam>) {
) -> (Vec<(&'static str, rmpv::Value)>, Vec<SetParam>) {
use sd_sync::option_sync_db_entry;
use sd_utils::chain_optional_iter;

View file

@ -11,7 +11,7 @@ use sd_prisma::{
prisma_sync,
};
use sd_sync::{CRDTOperation, OperationFactory};
use sd_utils::{db::maybe_missing, error::FileIOError, uuid_to_bytes};
use sd_utils::{db::maybe_missing, error::FileIOError, msgpack, uuid_to_bytes};
use std::{
collections::{HashMap, HashSet},
@ -20,7 +20,6 @@ use std::{
};
use futures::future::join_all;
use serde_json::json;
use tokio::fs;
use tracing::{error, trace};
use uuid::Uuid;
@ -165,7 +164,7 @@ async fn identifier_job_step(
pub_id: sd_utils::uuid_to_bytes(*pub_id),
},
file_path::cas_id::NAME,
json!(&metadata.cas_id),
msgpack!(&metadata.cas_id),
),
db.file_path().update(
file_path::pub_id::equals(sd_utils::uuid_to_bytes(*pub_id)),
@ -279,11 +278,11 @@ async fn identifier_job_step(
let (sync_params, db_params): (Vec<_>, Vec<_>) = [
(
(object::date_created::NAME, json!(date_created)),
(object::date_created::NAME, msgpack!(date_created)),
object::date_created::set(*date_created),
),
(
(object::kind::NAME, json!(kind)),
(object::kind::NAME, msgpack!(kind)),
object::kind::set(Some(kind)),
),
]
@ -366,7 +365,7 @@ fn connect_file_path_to_object<'db>(
pub_id: sd_utils::uuid_to_bytes(file_path_id),
},
file_path::object::NAME,
json!(prisma_sync::object::SyncId {
msgpack!(prisma_sync::object::SyncId {
pub_id: vec_id.clone()
}),
),

View file

@ -5,8 +5,8 @@ use sd_sync::*;
use chrono::{DateTime, FixedOffset, Utc};
use sd_utils::msgpack;
use serde::Deserialize;
use serde_json::json;
use specta::Type;
use uuid::Uuid;
@ -34,10 +34,13 @@ impl TagCreateArgs {
pub_id: pub_id.clone(),
},
[
(tag::name::NAME, json!(&self.name)),
(tag::color::NAME, json!(&self.color)),
(tag::is_hidden::NAME, json!(false)),
(tag::date_created::NAME, json!(&date_created.to_rfc3339())),
(tag::name::NAME, msgpack!(&self.name)),
(tag::color::NAME, msgpack!(&self.color)),
(tag::is_hidden::NAME, msgpack!(false)),
(
tag::date_created::NAME,
msgpack!(&date_created.to_rfc3339()),
),
],
),
db.tag().create(

View file

@ -14,7 +14,7 @@ use sd_prisma::{
prisma_sync,
};
use sd_sync::OperationFactory;
use sd_utils::{db::maybe_missing, error::FileIOError};
use sd_utils::{db::maybe_missing, error::FileIOError, msgpack};
use std::{
hash::{Hash, Hasher},
@ -162,7 +162,7 @@ impl StatefulJob for OldObjectValidatorJobInit {
pub_id: file_path.pub_id.clone(),
},
file_path::integrity_checksum::NAME,
json!(&checksum),
msgpack!(&checksum),
),
db.file_path().update(
file_path::pub_id::equals(file_path.pub_id.clone()),

View file

@ -69,7 +69,7 @@ mod originator {
instance: Uuid::new_v4(),
timestamp: sync::NTP64(0),
id: Uuid::new_v4(),
record_id: serde_json::Value::Null,
record_id: rmpv::Value::Nil,
model: "name".to_string(),
data: sd_sync::CRDTOperationData::Create,
}]);

View file

@ -37,6 +37,7 @@ uuid = { workspace = true, features = ["v4", "serde"] }
# Note: Keep same version as used by ort
ndarray = { version = "0.15.6" }
half = { version = "2.1", features = ['num-traits'] }
rmpv.workspace = true
# Microsoft does not provide a release for osx-gpu. See: https://github.com/microsoft/onnxruntime/releases
# "gpu" means CUDA or TensorRT EP. Thus, the ort crate cannot download them at build time.

View file

@ -4,7 +4,7 @@ use sd_prisma::{
prisma_sync,
};
use sd_sync::OperationFactory;
use sd_utils::{db::MissingFieldError, error::FileIOError};
use sd_utils::{db::MissingFieldError, error::FileIOError, msgpack};
use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
@ -16,7 +16,6 @@ use async_channel as chan;
use chrono::{DateTime, FixedOffset, Utc};
use futures_concurrency::future::{Join, Race};
use image::ImageFormat;
use serde_json::json;
use tokio::{
fs, spawn,
sync::{oneshot, OwnedRwLockReadGuard, OwnedSemaphorePermit, RwLock, Semaphore},
@ -432,7 +431,7 @@ pub async fn assign_labels(
.map(|name| {
sync_params.extend(sync.shared_create(
prisma_sync::label::SyncId { name: name.clone() },
[(label::date_created::NAME, json!(&date_created))],
[(label::date_created::NAME, msgpack!(&date_created))],
));
db.label()

View file

@ -15,3 +15,4 @@ uuid.workspace = true
rspc = { workspace = true }
specta.workspace = true
base64.workspace = true
rmpv.workspace = true

View file

@ -450,7 +450,7 @@ pub mod library {
pub key: String,
pub start_time: String,
pub end_time: String,
pub contents: serde_json::Value,
pub contents: String,
pub ops_count: usize,
}

View file

@ -10,3 +10,5 @@ sd-sync = { path = "../sync" }
prisma-client-rust = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
rmp-serde = "1.1.2"
rmpv.workspace = true

View file

@ -79,7 +79,6 @@ impl<'a> ModelSyncType<'a> {
group: get_field("group"),
}
}
// "owned" => Self::Owned { id },
_ => return None,
})

View file

@ -71,41 +71,47 @@ pub fn module((model, sync_type): ModelWithSyncType) -> Module {
let set_param_impl = {
let field_matches = model.fields().filter_map(|field| {
let field_name_snake = snake_ident(field.name());
let field_name_snake = snake_ident(field.name());
match field.refine() {
RefinedFieldWalker::Scalar(scalar_field) => {
(!scalar_field.is_in_required_relation()).then(|| quote! {
#model_name_snake::#field_name_snake::set(::serde_json::from_value(val).unwrap()),
})
},
RefinedFieldWalker::Relation(relation_field) => {
let relation_model_name_snake = snake_ident(relation_field.related_model().name());
match relation_field.referenced_fields() {
Some(i) => {
if i.count() == 1 {
Some(quote! {{
let val: std::collections::HashMap<String, ::serde_json::Value> = ::serde_json::from_value(val).unwrap();
let val = val.into_iter().next().unwrap();
#model_name_snake::#field_name_snake::connect(
#relation_model_name_snake::UniqueWhereParam::deserialize(&val.0, val.1).unwrap()
)
}})
} else { None }
},
_ => None
match field.refine() {
RefinedFieldWalker::Scalar(scalar_field) => {
(!scalar_field.is_in_required_relation()).then(|| {
quote! {
#model_name_snake::#field_name_snake::set(::rmpv::ext::from_value(val).unwrap()),
}
},
}.map(|body| quote!(#model_name_snake::#field_name_snake::NAME => #body))
});
})
}
RefinedFieldWalker::Relation(relation_field) => {
let relation_model_name_snake =
snake_ident(relation_field.related_model().name());
match relation_field.referenced_fields() {
Some(i) => {
if i.count() == 1 {
Some(quote! {{
let val: std::collections::HashMap<String, rmpv::Value> = ::rmpv::ext::from_value(val).unwrap();
let val = val.into_iter().next().unwrap();
#model_name_snake::#field_name_snake::connect(
#relation_model_name_snake::UniqueWhereParam::deserialize(&val.0, val.1).unwrap()
)
}})
} else {
None
}
}
_ => None,
}
}
}
.map(|body| quote!(#model_name_snake::#field_name_snake::NAME => #body))
});
match field_matches.clone().count() {
0 => quote!(),
_ => quote! {
impl #model_name_snake::SetParam {
pub fn deserialize(field: &str, val: ::serde_json::Value) -> Option<Self> {
pub fn deserialize(field: &str, val: ::rmpv::Value) -> Option<Self> {
Some(match field {
#(#field_matches)*
_ => return None
@ -125,7 +131,7 @@ pub fn module((model, sync_type): ModelWithSyncType) -> Module {
Some(quote!(#model_name_snake::#field_name_snake::NAME =>
#model_name_snake::#field_name_snake::equals(
::serde_json::from_value(val).unwrap()
::rmpv::ext::from_value(val).unwrap()
),
))
}
@ -137,7 +143,7 @@ pub fn module((model, sync_type): ModelWithSyncType) -> Module {
0 => quote!(),
_ => quote! {
impl #model_name_snake::UniqueWhereParam {
pub fn deserialize(field: &str, val: ::serde_json::Value) -> Option<Self> {
pub fn deserialize(field: &str, val: ::rmpv::Value) -> Option<Self> {
Some(match field {
#(#field_matches)*
_ => return None

View file

@ -25,7 +25,7 @@ pub fn r#enum(models: Vec<ModelWithSyncType>) -> TokenStream {
quote!(#model_name_pascal(#model_name_snake::SyncId, sd_sync::CRDTOperationData)),
quote! {
prisma::#model_name_snake::NAME =>
Self::#model_name_pascal(serde_json::from_value(op.record_id).ok()?, op.data)
Self::#model_name_pascal(rmpv::ext::from_value(op.record_id).ok()?, op.data)
},
)
})

View file

@ -7,6 +7,9 @@ edition = { workspace = true }
[dependencies]
prisma-client-rust = { workspace = true }
rmp = "0.8.12"
rmp-serde = "1.1.2"
rmpv = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
specta = { workspace = true, features = ["uuid", "uhlc"] }

View file

@ -1,7 +1,6 @@
use std::fmt::Debug;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use specta::Type;
use uhlc::NTP64;
use uuid::Uuid;
@ -22,12 +21,16 @@ impl std::fmt::Display for OperationKind<'_> {
}
}
#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Debug, Type)]
#[derive(PartialEq, Serialize, Deserialize, Clone, Debug, Type)]
pub enum CRDTOperationData {
#[serde(rename = "c")]
Create,
#[serde(rename = "u")]
Update { field: String, value: Value },
Update {
field: String,
#[specta(type = serde_json::Value)]
value: rmpv::Value,
},
#[serde(rename = "d")]
Delete,
}
@ -42,14 +45,15 @@ impl CRDTOperationData {
}
}
#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Type)]
#[derive(PartialEq, Serialize, Deserialize, Clone, Type)]
pub struct CRDTOperation {
pub instance: Uuid,
#[specta(type = u32)]
pub timestamp: NTP64,
pub id: Uuid,
pub model: String,
pub record_id: Value,
#[specta(type = serde_json::Value)]
pub record_id: rmpv::Value,
pub data: CRDTOperationData,
}

View file

@ -1,5 +1,4 @@
use prisma_client_rust::ModelTypes;
use serde_json::{json, Value};
use uhlc::HLC;
use uuid::Uuid;
@ -7,6 +6,12 @@ use crate::{
CRDTOperation, CRDTOperationData, RelationSyncId, RelationSyncModel, SharedSyncModel, SyncId,
};
macro_rules! msgpack {
($e:expr) => {
::rmpv::ext::to_value($e).expect("failed to serialize msgpack")
}
}
pub trait OperationFactory {
fn get_clock(&self) -> &HLC;
fn get_instance(&self) -> Uuid;
@ -23,7 +28,7 @@ pub trait OperationFactory {
timestamp: *timestamp.get_time(),
id: Uuid::new_v4(),
model: TModel::MODEL.to_string(),
record_id: json!(id),
record_id: msgpack!(id),
data,
}
}
@ -31,7 +36,7 @@ pub trait OperationFactory {
fn shared_create<TSyncId: SyncId<Model = TModel>, TModel: SharedSyncModel>(
&self,
id: TSyncId,
values: impl IntoIterator<Item = (&'static str, Value)> + 'static,
values: impl IntoIterator<Item = (&'static str, rmpv::Value)> + 'static,
) -> Vec<CRDTOperation> {
[self.new_op(&id, CRDTOperationData::Create)]
.into_iter()
@ -50,7 +55,7 @@ pub trait OperationFactory {
&self,
id: TSyncId,
field: impl Into<String>,
value: Value,
value: rmpv::Value,
) -> CRDTOperation {
self.new_op(
&id,
@ -70,7 +75,7 @@ pub trait OperationFactory {
fn relation_create<TSyncId: RelationSyncId<Model = TModel>, TModel: RelationSyncModel>(
&self,
id: TSyncId,
values: impl IntoIterator<Item = (&'static str, Value)> + 'static,
values: impl IntoIterator<Item = (&'static str, rmpv::Value)> + 'static,
) -> Vec<CRDTOperation> {
[self.new_op(&id, CRDTOperationData::Create)]
.into_iter()
@ -89,7 +94,7 @@ pub trait OperationFactory {
&self,
id: TSyncId,
field: impl Into<String>,
value: Value,
value: rmpv::Value,
) -> CRDTOperation {
self.new_op(
&id,
@ -111,7 +116,7 @@ pub trait OperationFactory {
macro_rules! sync_entry {
($v:expr, $($m:tt)*) => {{
let v = $v;
($($m)*::NAME, serde_json::json!(&v))
($($m)*::NAME, ::rmpv::ext::to_value(&v).expect("failed to serialize msgpack"))
}}
}

View file

@ -26,3 +26,13 @@ pub fn uuid_to_bytes(uuid: Uuid) -> Vec<u8> {
pub fn from_bytes_to_uuid(bytes: &[u8]) -> Uuid {
Uuid::from_slice(bytes).expect("corrupted uuid in database")
}
#[macro_export]
macro_rules! msgpack {
(null) => {
::rmpv::Value::Nil
};
($e:expr) => {
::rmpv::ext::to_value(&$e).expect("failed to serialize msgpack")
}
}

View file

@ -162,9 +162,7 @@ function LabelItemCount({ data }: { data: Extract<ExplorerItem, { type: 'Label'
{
filters: [
{
object: {
labels: {
in: [data.item.id]
object: { labels: { in: [data.item.id]
}
}
}