fix cloud sync and operations (#2218)

* fix cloud sync and operations

* remove some logs

* cleanup
This commit is contained in:
Brendan Allan 2024-03-18 16:51:16 +08:00 committed by GitHub
parent edffe46536
commit 3afc3bd34f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
27 changed files with 240 additions and 193 deletions

1
Cargo.lock generated
View file

@ -8173,6 +8173,7 @@ dependencies = [
"serde_json",
"specta",
"thiserror",
"tracing",
"uuid",
]

View file

@ -1 +1 @@
/sdserver_data/
/sdserver_data*

View file

@ -16,7 +16,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true }
tracing = { workspace = true }
tracing.workspace = true
uhlc = { workspace = true }
rmp-serde = "1.1.2"
rmpv = { workspace = true }

View file

@ -2,22 +2,16 @@ use std::future::Future;
use sd_prisma::{
prisma::{
file_path, label, label_on_object, location, media_data, object, tag, tag_on_object,
PrismaClient, SortOrder,
crdt_operation, file_path, label, label_on_object, location, media_data, object, tag,
tag_on_object, PrismaClient, SortOrder,
},
prisma_sync,
};
use sd_sync::{option_sync_entry, OperationFactory};
use sd_utils::chain_optional_iter;
use sd_utils::{chain_optional_iter, msgpack};
use crate::crdt_op_unchecked_db;
macro_rules! msgpack {
($e:expr) => {
::rmpv::ext::to_value($e).expect("failed to serialize msgpack")
}
}
/// Takes all the syncable data in the database and generates CRDTOperations for it.
/// This is a requirement before the library can sync.
pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, instance_id: i32) {
@ -25,7 +19,48 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
.with_timeout(9999999999)
.run(|db| async move {
println!("backfill started");
db.crdt_operation().delete_many(vec![]).exec().await?;
db.crdt_operation()
.delete_many(vec![crdt_operation::instance_id::equals(instance_id)])
.exec()
.await?;
paginate(
|cursor| {
db.tag()
.find_many(vec![tag::id::gt(cursor)])
.order_by(tag::id::order(SortOrder::Asc))
.exec()
},
|tag| tag.id,
|tags| {
db.crdt_operation()
.create_many(
tags.into_iter()
.flat_map(|t| {
sync.shared_create(
prisma_sync::tag::SyncId { pub_id: t.pub_id },
chain_optional_iter(
[],
[
t.name.map(|v| (tag::name::NAME, msgpack!(v))),
t.color.map(|v| (tag::color::NAME, msgpack!(v))),
t.date_created.map(|v| {
(tag::date_created::NAME, msgpack!(v))
}),
t.date_modified.map(|v| {
(tag::date_modified::NAME, msgpack!(v))
}),
],
),
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
},
)
.await?;
paginate(
|cursor| {
@ -216,6 +251,14 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
}),
location
),
option_sync_entry!(
fp.object.map(|o| {
prisma_sync::object::SyncId {
pub_id: o.pub_id,
}
}),
object
),
option_sync_entry!(
fp.materialized_path,
materialized_path
@ -243,44 +286,6 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
)
.await?;
paginate(
|cursor| {
db.tag()
.find_many(vec![tag::id::gt(cursor)])
.order_by(tag::id::order(SortOrder::Asc))
.exec()
},
|tag| tag.id,
|tags| {
db.crdt_operation()
.create_many(
tags.into_iter()
.flat_map(|t| {
sync.shared_create(
prisma_sync::tag::SyncId { pub_id: t.pub_id },
chain_optional_iter(
[],
[
t.name.map(|v| (tag::name::NAME, msgpack!(v))),
t.color.map(|v| (tag::color::NAME, msgpack!(v))),
t.date_created.map(|v| {
(tag::date_created::NAME, msgpack!(v))
}),
t.date_modified.map(|v| {
(tag::date_modified::NAME, msgpack!(v))
}),
],
),
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
},
)
.await?;
paginate_relation(
|group_id, item_id| {
db.tag_on_object()

View file

@ -17,35 +17,17 @@ impl crdt_include::Data {
NTP64(self.timestamp as u64)
}
pub fn id(&self) -> Uuid {
Uuid::from_slice(&self.id).unwrap()
}
pub fn instance(&self) -> Uuid {
Uuid::from_slice(&self.instance.pub_id).unwrap()
}
pub fn into_operation(self) -> CRDTOperation {
CRDTOperation {
id: self.id(),
instance: self.instance(),
timestamp: self.timestamp(),
record_id: rmp_serde::from_slice(&self.record_id).unwrap(),
model: self.model,
data: rmp_serde::from_slice(&self.data).unwrap(),
// match self {
// Self::Shared(op) => CRDTOperationType::Shared(SharedOperation {
// record_id: serde_json::from_slice(&op.record_id).unwrap(),
// model: op.model,
// data: serde_json::from_slice(&op.data).unwrap(),
// }),
// Self::Relation(op) => CRDTOperationType::Relation(RelationOperation {
// relation: op.relation,
// data: serde_json::from_slice(&op.data).unwrap(),
// relation_item: serde_json::from_slice(&op.item_id).unwrap(),
// relation_group: serde_json::from_slice(&op.group_id).unwrap(),
// }),
// },
}
}
}
@ -55,31 +37,17 @@ impl cloud_crdt_include::Data {
NTP64(self.timestamp as u64)
}
pub fn id(&self) -> Uuid {
Uuid::from_slice(&self.id).unwrap()
}
pub fn instance(&self) -> Uuid {
Uuid::from_slice(&self.instance.pub_id).unwrap()
}
pub fn into_operation(self) -> CRDTOperation {
CRDTOperation {
id: self.id(),
instance: self.instance(),
timestamp: self.timestamp(),
record_id: rmp_serde::from_slice(&self.record_id).unwrap(),
model: self.model,
data: serde_json::from_slice(&self.data).unwrap(),
// match self {
// Self::Shared(op) => ,
// Self::Relation(op) => CRDTOperationType::Relation(RelationOperation {
// relation: op.relation,
// data: serde_json::from_slice(&op.data).unwrap(),
// relation_item: serde_json::from_slice(&op.item_id).unwrap(),
// relation_group: serde_json::from_slice(&op.group_id).unwrap(),
// }),
// },
}
}
}
@ -95,7 +63,6 @@ pub async fn write_crdt_op_to_db(
fn crdt_op_db(op: &CRDTOperation) -> crdt_operation::Create {
crdt_operation::Create {
id: op.id.as_bytes().to_vec(),
timestamp: op.timestamp.0 as i64,
instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()),
kind: op.kind().to_string(),

View file

@ -6,6 +6,7 @@ use sd_prisma::{
};
use sd_sync::CRDTOperation;
use tokio::sync::{mpsc, oneshot, Mutex};
use tracing::debug;
use uhlc::{Timestamp, NTP64};
use uuid::Uuid;
@ -83,19 +84,31 @@ impl Actor {
loop {
tokio::select! {
res = &mut rx => {
if let Err(_) = res { break State::WaitingForNotification }
},
biased;
res = self.io.event_rx.recv() => {
if let Some(Event::Messages(event)) = res { break State::Ingesting(event) }
}
res = &mut rx => {
if let Err(_) = res {
debug!("messages request ignored");
break State::WaitingForNotification
}
},
}
}
}
State::Ingesting(event) => {
for op in event.messages {
let fut = self.receive_crdt_operation(op);
fut.await;
if event.messages.len() > 0 {
debug!(
"ingesting {} operations: {} to {}",
event.messages.len(),
event.messages.first().unwrap().timestamp.as_u64(),
event.messages.last().unwrap().timestamp.as_u64(),
);
for op in event.messages {
self.receive_crdt_operation(op).await;
}
}
match event.has_more {
@ -148,7 +161,7 @@ impl Actor {
.expect("timestamp has too much drift!");
// read the timestamp for the operation's instance, or insert one if it doesn't exist
let timestamp = self.timestamps.write().await.get(&op.instance).cloned();
let timestamp = self.timestamps.read().await.get(&op.instance).cloned();
// copy some fields bc rust ownership
let op_instance = op.instance;
@ -169,12 +182,12 @@ impl Actor {
async fn apply_op(&mut self, op: CRDTOperation) -> prisma_client_rust::Result<()> {
self.db
._transaction()
.with_timeout(30 * 1000)
.run(|db| async move {
// apply the operation to the actual record
ModelSyncData::from_op(op.clone())
.unwrap()
.exec(&db)
.await?;
let sync_data = ModelSyncData::from_op(op.clone());
sync_data.unwrap().exec(&db).await?;
// write the operation to the operations table
write_crdt_op_to_db(&op, &db).await?;

View file

@ -37,7 +37,6 @@ pub struct SharedState {
#[must_use]
pub fn crdt_op_db(op: &CRDTOperation) -> crdt_operation::Create {
crdt_operation::Create {
id: op.id.as_bytes().to_vec(),
timestamp: op.timestamp.0 as i64,
instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()),
kind: op.kind().to_string(),
@ -54,7 +53,6 @@ pub fn crdt_op_unchecked_db(
instance_id: i32,
) -> crdt_operation::CreateUnchecked {
crdt_operation::CreateUnchecked {
id: op.id.as_bytes().to_vec(),
timestamp: op.timestamp.0 as i64,
instance_id,
kind: op.kind().to_string(),

View file

@ -113,14 +113,38 @@ impl Manager {
Ok(ret)
}
pub async fn get_instance_ops(
&self,
count: u32,
instance_uuid: Uuid,
timestamp: NTP64,
) -> prisma_client_rust::Result<Vec<CRDTOperation>> {
let db = &self.db;
Ok(db
.crdt_operation()
.find_many(vec![
crdt_operation::instance::is(vec![instance::pub_id::equals(uuid_to_bytes(
instance_uuid,
))]),
crdt_operation::timestamp::gt(timestamp.as_u64() as i64),
])
.take(i64::from(count))
.order_by(crdt_operation::timestamp::order(SortOrder::Asc))
.include(crdt_include::include())
.exec()
.await?
.into_iter()
.map(|o| o.into_operation())
.collect())
}
pub async fn get_ops(
&self,
args: GetOpsArgs,
) -> prisma_client_rust::Result<Vec<CRDTOperation>> {
let db = &self.db;
dbg!(&args);
macro_rules! db_args {
($args:ident, $op:ident) => {
vec![prisma_client_rust::operator::or(

View file

@ -17,7 +17,8 @@ generator sync {
}
model CRDTOperation {
id Bytes @id
id Int @id @default(autoincrement())
timestamp BigInt
model String
@ -29,8 +30,6 @@ model CRDTOperation {
instance_id Int
instance Instance @relation(fields: [instance_id], references: [id])
// attestation Bytes
@@map("crdt_operation")
}
@ -533,7 +532,8 @@ model SavedSearch {
/// @local(id: id)
model CloudCRDTOperation {
id Bytes @id
id Int @id @default(autoincrement())
timestamp BigInt
model String

View file

@ -119,6 +119,7 @@ mod library {
false,
None,
&node,
true,
)
.await?;
node.libraries

View file

@ -358,7 +358,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
sync.shared_update(
prisma_sync::object::SyncId { pub_id: d.pub_id },
object::date_accessed::NAME,
msgpack!(null),
msgpack!(nil),
),
d.id,
)

View file

@ -1,6 +1,6 @@
use std::sync::Arc;
use tokio::sync::Notify;
use tracing::info;
use tracing::debug;
use crate::cloud::sync::err_break;
@ -25,7 +25,9 @@ pub async fn run_actor(sync: Arc<sd_core_sync::Manager>, notify: Arc<Notify>) {
use sd_core_sync::*;
let timestamps = match req {
Request::FinishedIngesting => break,
Request::FinishedIngesting => {
break;
}
Request::Messages { timestamps, .. } => timestamps,
_ => continue,
};
@ -38,14 +40,23 @@ pub async fn run_actor(sync: Arc<sd_core_sync::Manager>, notify: Arc<Notify>) {
.await
);
info!("Got {} cloud ops to ingest", ops.len());
if ops.is_empty() {
break;
}
debug!(
"Sending {} messages ({} to {}) to ingester",
ops.len(),
ops.first().unwrap().timestamp.as_u64(),
ops.last().unwrap().timestamp.as_u64(),
);
err_break!(
sync.ingest
.event_tx
.send(sd_core_sync::Event::Messages(MessagesEvent {
instance_id: sync.instance,
has_more: ops.len() == 1000,
has_more: ops.len() == OPS_PER_REQUEST as usize,
messages: ops,
}))
.await

View file

@ -10,7 +10,7 @@ use sd_p2p::{IdentityOrRemoteIdentity, RemoteIdentity};
use sd_prisma::prisma::{cloud_crdt_operation, instance, PrismaClient, SortOrder};
use sd_sync::CRDTOperation;
use sd_utils::uuid_to_bytes;
use tracing::info;
use tracing::{debug, info};
use std::{
collections::{hash_map::Entry, HashMap},
@ -40,10 +40,12 @@ pub async fn run_actor(
) {
loop {
loop {
// We need to know the lastest operations we should be retrieving
let mut cloud_timestamps = {
let timestamps = sync.timestamps.read().await;
err_break!(
// looks up the most recent operation we've received (not ingested!) for each instance
let db_timestamps = err_break!(
db._batch(
timestamps
.keys()
@ -59,24 +61,32 @@ pub async fn run_actor(
.collect::<Vec<_>>()
)
.await
)
.into_iter()
.zip(timestamps.iter())
.map(|(d, (id, sync_timestamp))| {
let cloud_timestamp = NTP64(d.map(|d| d.timestamp).unwrap_or_default() as u64);
);
let max_timestamp = Ord::max(cloud_timestamp, *sync_timestamp);
// compares the latest ingested timestamp with the latest received timestamp
// and picks the highest one for each instance
let mut cloud_timestamps = db_timestamps
.into_iter()
.zip(timestamps.iter())
.map(|(d, (id, sync_timestamp))| {
let cloud_timestamp = d.map(|d| d.timestamp).unwrap_or_default() as u64;
(*id, max_timestamp)
})
.collect::<HashMap<_, _>>()
debug!(
"Instance {id}, Sync Timestamp {}, Cloud Timestamp {cloud_timestamp}",
sync_timestamp.as_u64()
);
let max_timestamp = Ord::max(cloud_timestamp, sync_timestamp.as_u64());
(*id, max_timestamp)
})
.collect::<HashMap<_, _>>();
cloud_timestamps.remove(&instance_uuid);
cloud_timestamps
};
info!(
"Fetched timestamps for {} local instances",
cloud_timestamps.len()
);
let instance_timestamps = sync
.timestamps
.read()
@ -87,9 +97,8 @@ pub async fn run_actor(
instance_uuid: *uuid,
from_time: cloud_timestamps
.get(uuid)
.cloned()
.copied()
.unwrap_or_default()
.as_u64()
.to_string(),
},
)
@ -158,17 +167,26 @@ pub async fn run_actor(
.await
);
e.insert(NTP64(0));
e.insert(0);
}
let compressed_operations: CompressedCRDTOperations = err_break!(
rmp_serde::from_slice(err_break!(&BASE64_STANDARD.decode(collection.contents)))
);
err_break!(write_cloud_ops_to_db(compressed_operations.into_ops(), &db).await);
let operations = compressed_operations.into_ops();
let collection_timestamp =
NTP64(collection.end_time.parse().expect("unable to parse time"));
debug!(
"Processing collection. Instance {}, Start {}, End {}",
&collection.instance_uuid,
operations.first().unwrap().timestamp.as_u64(),
operations.last().unwrap().timestamp.as_u64(),
);
err_break!(write_cloud_ops_to_db(operations, &db).await);
let collection_timestamp: u64 =
collection.end_time.parse().expect("unable to parse time");
let timestamp = cloud_timestamps
.entry(collection.instance_uuid)
@ -198,7 +216,6 @@ async fn write_cloud_ops_to_db(
fn crdt_op_db(op: &CRDTOperation) -> cloud_crdt_operation::Create {
cloud_crdt_operation::Create {
id: op.id.as_bytes().to_vec(),
timestamp: op.timestamp.0 as i64,
instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()),
kind: op.data.as_kind().to_string(),

View file

@ -2,6 +2,7 @@ use super::CompressedCRDTOperations;
use sd_cloud_api::RequestConfigProvider;
use sd_core_sync::{GetOpsArgs, SyncMessage, NTP64};
use tracing::debug;
use uuid::Uuid;
use std::{sync::Arc, time::Duration};
@ -42,22 +43,25 @@ pub async fn run_actor(
use sd_cloud_api::library::message_collections::do_add;
debug!(
"Preparing to send {} instances' operations to cloud",
req_adds.len()
);
// gets new operations for each instance to send to cloud
for req_add in req_adds {
let ops = err_break!(
sync.get_ops(GetOpsArgs {
count: 1000,
clocks: vec![(
req_add.instance_uuid,
NTP64(
req_add
.from_time
.unwrap_or_else(|| "0".to_string())
.parse()
.expect("couldn't parse ntp64 value"),
),
)],
})
sync.get_instance_ops(
1000,
req_add.instance_uuid,
NTP64(
req_add
.from_time
.unwrap_or_else(|| "0".to_string())
.parse()
.expect("couldn't parse ntp64 value"),
)
)
.await
);
@ -72,6 +76,11 @@ pub async fn run_actor(
use base64::prelude::*;
debug!(
"Instance {}: {} to {}",
req_add.instance_uuid, start_time, end_time
);
instances.push(do_add::Input {
uuid: req_add.instance_uuid,
key: req_add.key,

View file

@ -86,6 +86,7 @@ impl LibraryConfig {
description: Option<String>,
instance_id: i32,
path: impl AsRef<Path>,
generate_sync_operations: bool,
) -> Result<Self, LibraryConfigError> {
let this = Self {
name,
@ -94,7 +95,7 @@ impl LibraryConfig {
version: Self::LATEST_VERSION,
cloud_id: None,
// will always be `true` eventually
generate_sync_operations: Arc::new(AtomicBool::new(false)),
generate_sync_operations: Arc::new(AtomicBool::new(generate_sync_operations)),
};
this.save(path).await.map(|()| this)

View file

@ -156,7 +156,7 @@ impl Libraries {
description: Option<String>,
node: &Arc<Node>,
) -> Result<Arc<Library>, LibraryManagerError> {
self.create_with_uuid(Uuid::new_v4(), name, description, true, None, node)
self.create_with_uuid(Uuid::new_v4(), name, description, true, None, node, false)
.await
}
@ -169,6 +169,7 @@ impl Libraries {
// `None` will fallback to default as library must be created with at least one instance
instance: Option<instance::Create>,
node: &Arc<Node>,
generate_sync_operations: bool,
) -> Result<Arc<Library>, LibraryManagerError> {
if name.as_ref().is_empty() || name.as_ref().chars().all(|x| x.is_whitespace()) {
return Err(LibraryManagerError::InvalidConfig(
@ -184,6 +185,7 @@ impl Libraries {
// First instance will be zero
0,
&config_path,
generate_sync_operations,
)
.await?;

View file

@ -195,8 +195,8 @@ async fn execute_indexer_update_step(
// As this file was updated while Spacedrive was offline, we mark the object_id and cas_id as null
// So this file_path will be updated at file identifier job
should_unlink_object
.then_some(((object_id::NAME, msgpack!(null)), object::disconnect())),
Some(((cas_id::NAME, msgpack!(null)), cas_id::set(None))),
.then_some(((object_id::NAME, msgpack!(nil)), object::disconnect())),
Some(((cas_id::NAME, msgpack!(nil)), cas_id::set(None))),
Some(sync_db_entry!(*is_dir, is_dir)),
Some(sync_db_entry!(
entry.metadata.size_in_bytes.to_be_bytes().to_vec(),

View file

@ -519,7 +519,7 @@ async fn inner_update_file(
Some(inode::set(Some(inode_to_db(inode)))),
)
} else {
((inode::NAME, msgpack!(null)), None)
((inode::NAME, msgpack!(nil)), None)
}
},
{
@ -529,7 +529,7 @@ async fn inner_update_file(
Some(hidden::set(Some(is_hidden))),
)
} else {
((hidden::NAME, msgpack!(null)), None)
((hidden::NAME, msgpack!(nil)), None)
}
},
]

View file

@ -127,7 +127,7 @@ impl InitConfig {
lib
} else {
let library = library_manager
.create_with_uuid(lib.id, lib.name, lib.description, true, None, node)
.create_with_uuid(lib.id, lib.name, lib.description, true, None, node, false)
.await?;
let Some(lib) = library_manager.get_library(&library.id).await else {

View file

@ -16,3 +16,4 @@ rspc = { workspace = true }
specta.workspace = true
base64.workspace = true
rmpv.workspace = true
tracing.workspace = true

View file

@ -56,7 +56,7 @@ pub fn module((model, sync_type): ModelWithSyncType) -> Module {
};
quote! {
#[derive(serde::Serialize, serde::Deserialize, Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct SyncId {
#(#fields),*
}

View file

@ -133,14 +133,18 @@ pub fn r#enum(models: Vec<ModelWithSyncType>) -> TokenStream {
let item_model_name_snake = snake_ident(item.related_model().name());
let item_field_name_snake = snake_ident(item.name());
quote!(db.#item_model_name_snake().find_unique(
prisma::#item_model_name_snake::#item_model_sync_id_field_name_snake::equals(
id.#item_field_name_snake.#item_model_sync_id_field_name_snake.clone()
)
))
quote! {
db.#item_model_name_snake()
.find_unique(
prisma::#item_model_name_snake::#item_model_sync_id_field_name_snake::equals(
id.#item_field_name_snake.#item_model_sync_id_field_name_snake.clone()
)
)
.select(prisma::#item_model_name_snake::select!({ id }))
}
};
[batch_item(item), batch_item(group)]
[batch_item(group), batch_item(item)]
};
let create_items = {
@ -159,18 +163,22 @@ pub fn r#enum(models: Vec<ModelWithSyncType>) -> TokenStream {
};
quote! {
let (Some(item), Some(group)) =
db._batch((#(#db_batch_items),*)).await? else {
let (Some(group), Some(item)) =
(#(#db_batch_items.exec().await?),*) else {
panic!("item and group not found!");
};
let id = prisma::#model_name_snake::#compound_id(item.id, group.id);
let id = prisma::#model_name_snake::#compound_id(group.id, item.id);
match data {
sd_sync::CRDTOperationData::Create => {
db.#model_name_snake()
.create(
#(#create_items),*,
.upsert(
id,
prisma::#model_name_snake::create(
#(#create_items),*,
vec![]
),
vec![],
)
.exec()

View file

@ -83,7 +83,6 @@ impl CompressedCRDTOperations {
model: model_str.clone(),
record_id: record_id.clone(),
timestamp: op.timestamp,
id: op.id,
data: op.data,
})
}
@ -98,7 +97,6 @@ impl CompressedCRDTOperations {
#[derive(PartialEq, Serialize, Deserialize, Clone)]
pub struct CompressedCRDTOperation {
pub timestamp: NTP64,
pub id: Uuid,
pub data: CRDTOperationData,
}
@ -106,7 +104,6 @@ impl From<CRDTOperation> for CompressedCRDTOperation {
fn from(value: CRDTOperation) -> Self {
Self {
timestamp: value.timestamp,
id: value.id,
data: value.data,
}
}
@ -124,7 +121,6 @@ mod test {
CRDTOperation {
instance,
timestamp: NTP64(0),
id: Uuid::new_v4(),
model: "FilePath".to_string(),
record_id: rmpv::Value::Nil,
data: CRDTOperationData::Create,
@ -132,7 +128,6 @@ mod test {
CRDTOperation {
instance,
timestamp: NTP64(0),
id: Uuid::new_v4(),
model: "FilePath".to_string(),
record_id: rmpv::Value::Nil,
data: CRDTOperationData::Create,
@ -140,7 +135,6 @@ mod test {
CRDTOperation {
instance,
timestamp: NTP64(0),
id: Uuid::new_v4(),
model: "FilePath".to_string(),
record_id: rmpv::Value::Nil,
data: CRDTOperationData::Create,
@ -148,7 +142,6 @@ mod test {
CRDTOperation {
instance,
timestamp: NTP64(0),
id: Uuid::new_v4(),
model: "Object".to_string(),
record_id: rmpv::Value::Nil,
data: CRDTOperationData::Create,
@ -156,7 +149,6 @@ mod test {
CRDTOperation {
instance,
timestamp: NTP64(0),
id: Uuid::new_v4(),
model: "Object".to_string(),
record_id: rmpv::Value::Nil,
data: CRDTOperationData::Create,
@ -164,7 +156,6 @@ mod test {
CRDTOperation {
instance,
timestamp: NTP64(0),
id: Uuid::new_v4(),
model: "FilePath".to_string(),
record_id: rmpv::Value::Nil,
data: CRDTOperationData::Create,
@ -172,7 +163,6 @@ mod test {
CRDTOperation {
instance,
timestamp: NTP64(0),
id: Uuid::new_v4(),
model: "FilePath".to_string(),
record_id: rmpv::Value::Nil,
data: CRDTOperationData::Create,
@ -201,17 +191,14 @@ mod test {
rmpv::Value::Nil,
vec![
CompressedCRDTOperation {
id: Uuid::new_v4(),
timestamp: NTP64(0),
data: CRDTOperationData::Create,
},
CompressedCRDTOperation {
id: Uuid::new_v4(),
timestamp: NTP64(0),
data: CRDTOperationData::Create,
},
CompressedCRDTOperation {
id: Uuid::new_v4(),
timestamp: NTP64(0),
data: CRDTOperationData::Create,
},
@ -224,12 +211,10 @@ mod test {
rmpv::Value::Nil,
vec![
CompressedCRDTOperation {
id: Uuid::new_v4(),
timestamp: NTP64(0),
data: CRDTOperationData::Create,
},
CompressedCRDTOperation {
id: Uuid::new_v4(),
timestamp: NTP64(0),
data: CRDTOperationData::Create,
},
@ -242,12 +227,10 @@ mod test {
rmpv::Value::Nil,
vec![
CompressedCRDTOperation {
id: Uuid::new_v4(),
timestamp: NTP64(0),
data: CRDTOperationData::Create,
},
CompressedCRDTOperation {
id: Uuid::new_v4(),
timestamp: NTP64(0),
data: CRDTOperationData::Create,
},

View file

@ -50,7 +50,6 @@ pub struct CRDTOperation {
pub instance: Uuid,
#[specta(type = u32)]
pub timestamp: NTP64,
pub id: Uuid,
pub model: String,
#[specta(type = serde_json::Value)]
pub record_id: rmpv::Value,

View file

@ -7,9 +7,15 @@ use crate::{
};
macro_rules! msgpack {
($e:expr) => {
::rmpv::ext::to_value($e).expect("failed to serialize msgpack")
}
(nil) => {
::rmpv::Value::Nil
};
($e:expr) => {{
let bytes = rmp_serde::to_vec_named(&$e).expect("failed to serialize msgpack");
let value: rmpv::Value = rmp_serde::from_slice(&bytes).expect("failed to deserialize msgpack");
value
}}
}
pub trait OperationFactory {
@ -26,7 +32,6 @@ pub trait OperationFactory {
CRDTOperation {
instance: self.get_instance(),
timestamp: *timestamp.get_time(),
id: Uuid::new_v4(),
model: TModel::MODEL.to_string(),
record_id: msgpack!(id),
data,
@ -114,10 +119,9 @@ pub trait OperationFactory {
#[macro_export]
macro_rules! sync_entry {
($v:expr, $($m:tt)*) => {{
let v = $v;
($($m)*::NAME, ::rmpv::ext::to_value(&v).expect("failed to serialize msgpack"))
}}
($v:expr, $($m:tt)*) => {
($($m)*::NAME, ::sd_utils::msgpack!($v))
}
}
#[macro_export]

View file

@ -29,10 +29,13 @@ pub fn from_bytes_to_uuid(bytes: &[u8]) -> Uuid {
#[macro_export]
macro_rules! msgpack {
(null) => {
(nil) => {
::rmpv::Value::Nil
};
($e:expr) => {
::rmpv::ext::to_value(&$e).expect("failed to serialize msgpack")
}
($e:expr) => {{
let bytes = rmp_serde::to_vec_named(&$e).expect("failed to serialize msgpack");
let value: rmpv::Value = rmp_serde::from_slice(&bytes).expect("failed to deserialize msgpack");
value
}}
}

View file

@ -154,7 +154,7 @@ export type Backup = ({ id: string; timestamp: string; library_id: string; libra
export type BuildInfo = { version: string; commit: string }
export type CRDTOperation = { instance: string; timestamp: number; id: string; model: string; record_id: JsonValue; data: CRDTOperationData }
export type CRDTOperation = { instance: string; timestamp: number; model: string; record_id: JsonValue; data: CRDTOperationData }
export type CRDTOperationData = "c" | { u: { field: string; value: JsonValue } } | "d"