[ENG-1776] Delete cloud ops after they've been ingested (#2512)

* delete cloud ops after they've been ingested

* give wait_tx to p2p sync ingest

* remove Ingested event

* add sync docs for setting relation fields

* Update core/crates/sync/README.md

Co-authored-by: Oscar Beaumont <oscar@otbeaumont.me>

---------

Co-authored-by: Oscar Beaumont <oscar@otbeaumont.me>
This commit is contained in:
Brendan Allan 2024-05-31 14:54:22 +08:00 committed by GitHub
parent ab97572878
commit 735e80ad4d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 77 additions and 18 deletions

View file

@ -2,8 +2,6 @@
Spacedrive's sync system. Consumes types and helpers from `sd-sync`.
## Using Sync
### Creating Records
Prepare a sync id by creating or obtaining its value,
@ -101,3 +99,34 @@ sync.write_ops(
)
)
```
### Setting Relation Fields
Setting relation fields requires providing the Sync ID of the relation.
Setting the relation field's scalar fields instead will not properly sync then relation,
usually because the scalar fields are local and disconnected from the Sync ID.
```rs
let (sync_params, db_params): (Vec<_>, Vec<_>) = [
sync_db_entry!(
prisma_sync::object::SyncId { pub_id: object_pub_id },
file_path::object
)
].into_iter().unzip();
sync.write_ops(
db,
(
sync.shared_update(
prisma_sync::file_path::SyncId {
pub_id: file_path_pub_id
},
sync_params
),
db.file_path().update(
file_path::id::equals(file_path_id),
db_params
)
)
)
```

View file

@ -41,14 +41,17 @@ impl cloud_crdt_include::Data {
Uuid::from_slice(&self.instance.pub_id).unwrap()
}
pub fn into_operation(self) -> CRDTOperation {
CRDTOperation {
instance: self.instance(),
timestamp: self.timestamp(),
record_id: rmp_serde::from_slice(&self.record_id).unwrap(),
model: self.model as u16,
data: serde_json::from_slice(&self.data).unwrap(),
}
pub fn into_operation(self) -> (i32, CRDTOperation) {
(
self.id,
CRDTOperation {
instance: self.instance(),
timestamp: self.timestamp(),
record_id: rmp_serde::from_slice(&self.record_id).unwrap(),
model: self.model as u16,
data: serde_json::from_slice(&self.data).unwrap(),
},
)
}
}

View file

@ -31,7 +31,7 @@ pub enum Request {
timestamps: Vec<(Uuid, NTP64)>,
tx: oneshot::Sender<()>,
},
Ingested,
// Ingested,
FinishedIngesting,
}
@ -129,6 +129,10 @@ impl Actor {
}
}
if let Some(tx) = event.wait_tx {
tx.send(()).ok();
}
match event.has_more {
true => State::RetrievingMessages,
false => {
@ -421,7 +425,7 @@ impl Actor {
self.timestamps.write().await.insert(instance, new_ts);
self.io.req_tx.send(Request::Ingested).await.ok();
// self.io.req_tx.send(Request::Ingested).await.ok();
Ok(())
}
@ -445,6 +449,7 @@ pub struct MessagesEvent {
pub instance_id: Uuid,
pub messages: CompressedCRDTOperations,
pub has_more: bool,
pub wait_tx: Option<oneshot::Sender<()>>,
}
impl ActorTypes for Actor {

View file

@ -246,7 +246,7 @@ impl Manager {
pub async fn get_cloud_ops(
&self,
args: GetOpsArgs,
) -> prisma_client_rust::Result<Vec<CRDTOperation>> {
) -> prisma_client_rust::Result<Vec<(i32, CRDTOperation)>> {
let db = &self.db;
macro_rules! db_args {

View file

@ -128,13 +128,14 @@ impl Instance {
messages: CompressedCRDTOperations::new(messages),
has_more: false,
instance_id: instance1.id,
wait_tx: None,
}))
.await
.unwrap();
}
ingest::Request::Ingested => {
instance2.sync.tx.send(SyncMessage::Ingested).ok();
}
// ingest::Request::Ingested => {
// instance2.sync.tx.send(SyncMessage::Ingested).ok();
// }
ingest::Request::FinishedIngesting => {}
}
}

View file

@ -1,3 +1,4 @@
use sd_prisma::prisma::cloud_crdt_operation;
use sd_sync::CompressedCRDTOperations;
use std::sync::{
atomic::{AtomicBool, Ordering},
@ -44,13 +45,15 @@ pub async fn run_actor(
_ => continue,
};
let ops = err_break!(
let (ops_ids, ops): (Vec<_>, Vec<_>) = err_break!(
sync.get_cloud_ops(GetOpsArgs {
clocks: timestamps,
count: OPS_PER_REQUEST,
})
.await
);
)
.into_iter()
.unzip();
if ops.is_empty() {
break;
@ -63,6 +66,8 @@ pub async fn run_actor(
ops.last().map(|operation| operation.timestamp.as_u64()),
);
let (wait_tx, wait_rx) = tokio::sync::oneshot::channel::<()>();
err_break!(
sync.ingest
.event_tx
@ -70,9 +75,20 @@ pub async fn run_actor(
instance_id: sync.instance,
has_more: ops.len() == OPS_PER_REQUEST as usize,
messages: CompressedCRDTOperations::new(ops),
wait_tx: Some(wait_tx)
}))
.await
);
err_break!(wait_rx.await);
err_break!(
sync.db
.cloud_crdt_operation()
.delete_many(vec![cloud_crdt_operation::id::in_vec(ops_ids)])
.exec()
.await
);
}
}
}

View file

@ -244,15 +244,20 @@ mod responder {
let rx::Operations(ops) = rx::Operations::from_stream(stream).await.unwrap();
let (wait_tx, wait_rx) = tokio::sync::oneshot::channel::<()>();
ingest
.event_tx
.send(Event::Messages(MessagesEvent {
instance_id: library.sync.instance,
has_more: ops.len() == OPS_PER_REQUEST as usize,
messages: ops,
wait_tx: Some(wait_tx),
}))
.await
.expect("TODO: Handle ingest channel closed, so we don't loose ops");
wait_rx.await.unwrap()
}
debug!("Sync responder done");