diff --git a/core/crates/sync/README.md b/core/crates/sync/README.md index 73791e4d7..11d8f53da 100644 --- a/core/crates/sync/README.md +++ b/core/crates/sync/README.md @@ -2,8 +2,6 @@ Spacedrive's sync system. Consumes types and helpers from `sd-sync`. -## Using Sync - ### Creating Records Prepare a sync id by creating or obtaining its value, @@ -101,3 +99,34 @@ sync.write_ops( ) ) ``` + +### Setting Relation Fields + +Setting relation fields requires providing the Sync ID of the relation. +Setting the relation field's scalar fields instead will not properly sync then relation, +usually because the scalar fields are local and disconnected from the Sync ID. + +```rs +let (sync_params, db_params): (Vec<_>, Vec<_>) = [ + sync_db_entry!( + prisma_sync::object::SyncId { pub_id: object_pub_id }, + file_path::object + ) +].into_iter().unzip(); + +sync.write_ops( + db, + ( + sync.shared_update( + prisma_sync::file_path::SyncId { + pub_id: file_path_pub_id + }, + sync_params + ), + db.file_path().update( + file_path::id::equals(file_path_id), + db_params + ) + ) +) +``` diff --git a/core/crates/sync/src/db_operation.rs b/core/crates/sync/src/db_operation.rs index 36ff15e63..ce9bb6ab9 100644 --- a/core/crates/sync/src/db_operation.rs +++ b/core/crates/sync/src/db_operation.rs @@ -41,14 +41,17 @@ impl cloud_crdt_include::Data { Uuid::from_slice(&self.instance.pub_id).unwrap() } - pub fn into_operation(self) -> CRDTOperation { - CRDTOperation { - instance: self.instance(), - timestamp: self.timestamp(), - record_id: rmp_serde::from_slice(&self.record_id).unwrap(), - model: self.model as u16, - data: serde_json::from_slice(&self.data).unwrap(), - } + pub fn into_operation(self) -> (i32, CRDTOperation) { + ( + self.id, + CRDTOperation { + instance: self.instance(), + timestamp: self.timestamp(), + record_id: rmp_serde::from_slice(&self.record_id).unwrap(), + model: self.model as u16, + data: serde_json::from_slice(&self.data).unwrap(), + }, + ) } } diff --git a/core/crates/sync/src/ingest.rs b/core/crates/sync/src/ingest.rs index aeee7d3b2..df445976c 100644 --- a/core/crates/sync/src/ingest.rs +++ b/core/crates/sync/src/ingest.rs @@ -31,7 +31,7 @@ pub enum Request { timestamps: Vec<(Uuid, NTP64)>, tx: oneshot::Sender<()>, }, - Ingested, + // Ingested, FinishedIngesting, } @@ -129,6 +129,10 @@ impl Actor { } } + if let Some(tx) = event.wait_tx { + tx.send(()).ok(); + } + match event.has_more { true => State::RetrievingMessages, false => { @@ -421,7 +425,7 @@ impl Actor { self.timestamps.write().await.insert(instance, new_ts); - self.io.req_tx.send(Request::Ingested).await.ok(); + // self.io.req_tx.send(Request::Ingested).await.ok(); Ok(()) } @@ -445,6 +449,7 @@ pub struct MessagesEvent { pub instance_id: Uuid, pub messages: CompressedCRDTOperations, pub has_more: bool, + pub wait_tx: Option>, } impl ActorTypes for Actor { diff --git a/core/crates/sync/src/manager.rs b/core/crates/sync/src/manager.rs index ff7c0217f..1a6a52378 100644 --- a/core/crates/sync/src/manager.rs +++ b/core/crates/sync/src/manager.rs @@ -246,7 +246,7 @@ impl Manager { pub async fn get_cloud_ops( &self, args: GetOpsArgs, - ) -> prisma_client_rust::Result> { + ) -> prisma_client_rust::Result> { let db = &self.db; macro_rules! db_args { diff --git a/core/crates/sync/tests/mock_instance.rs b/core/crates/sync/tests/mock_instance.rs index 84b2e4de2..513833dfc 100644 --- a/core/crates/sync/tests/mock_instance.rs +++ b/core/crates/sync/tests/mock_instance.rs @@ -128,13 +128,14 @@ impl Instance { messages: CompressedCRDTOperations::new(messages), has_more: false, instance_id: instance1.id, + wait_tx: None, })) .await .unwrap(); } - ingest::Request::Ingested => { - instance2.sync.tx.send(SyncMessage::Ingested).ok(); - } + // ingest::Request::Ingested => { + // instance2.sync.tx.send(SyncMessage::Ingested).ok(); + // } ingest::Request::FinishedIngesting => {} } } diff --git a/core/src/cloud/sync/ingest.rs b/core/src/cloud/sync/ingest.rs index dda8dc336..8801ebceb 100644 --- a/core/src/cloud/sync/ingest.rs +++ b/core/src/cloud/sync/ingest.rs @@ -1,3 +1,4 @@ +use sd_prisma::prisma::cloud_crdt_operation; use sd_sync::CompressedCRDTOperations; use std::sync::{ atomic::{AtomicBool, Ordering}, @@ -44,13 +45,15 @@ pub async fn run_actor( _ => continue, }; - let ops = err_break!( + let (ops_ids, ops): (Vec<_>, Vec<_>) = err_break!( sync.get_cloud_ops(GetOpsArgs { clocks: timestamps, count: OPS_PER_REQUEST, }) .await - ); + ) + .into_iter() + .unzip(); if ops.is_empty() { break; @@ -63,6 +66,8 @@ pub async fn run_actor( ops.last().map(|operation| operation.timestamp.as_u64()), ); + let (wait_tx, wait_rx) = tokio::sync::oneshot::channel::<()>(); + err_break!( sync.ingest .event_tx @@ -70,9 +75,20 @@ pub async fn run_actor( instance_id: sync.instance, has_more: ops.len() == OPS_PER_REQUEST as usize, messages: CompressedCRDTOperations::new(ops), + wait_tx: Some(wait_tx) })) .await ); + + err_break!(wait_rx.await); + + err_break!( + sync.db + .cloud_crdt_operation() + .delete_many(vec![cloud_crdt_operation::id::in_vec(ops_ids)]) + .exec() + .await + ); } } } diff --git a/core/src/p2p/sync/mod.rs b/core/src/p2p/sync/mod.rs index 5cbb0b0e3..1106b90f5 100644 --- a/core/src/p2p/sync/mod.rs +++ b/core/src/p2p/sync/mod.rs @@ -244,15 +244,20 @@ mod responder { let rx::Operations(ops) = rx::Operations::from_stream(stream).await.unwrap(); + let (wait_tx, wait_rx) = tokio::sync::oneshot::channel::<()>(); + ingest .event_tx .send(Event::Messages(MessagesEvent { instance_id: library.sync.instance, has_more: ops.len() == OPS_PER_REQUEST as usize, messages: ops, + wait_tx: Some(wait_tx), })) .await .expect("TODO: Handle ingest channel closed, so we don't loose ops"); + + wait_rx.await.unwrap() } debug!("Sync responder done");