diff --git a/core/src/sync/manager.rs b/core/src/sync/manager.rs index aa22ebea7..d3b1b4ad2 100644 --- a/core/src/sync/manager.rs +++ b/core/src/sync/manager.rs @@ -6,7 +6,7 @@ use crate::{ use sd_sync::*; use futures::future::join_all; -use serde_json::{from_slice, from_value, json, to_vec, Value}; +use serde_json::{from_value, json, to_vec, Value}; use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -147,69 +147,48 @@ impl SyncManager { } pub async fn get_ops(&self) -> prisma_client_rust::Result> { - owned_operation::include!(owned_op_with_node { node }); + let db = &self.db; - impl TryInto for owned_op_with_node::Data { - type Error = (); - - fn try_into(self) -> Result { - let id = Uuid::from_slice(&self.id).map_err(|_| ())?; - let node = Uuid::from_slice(&self.node.pub_id).map_err(|_| ())?; - - Ok(CRDTOperation { - id, - node, - timestamp: NTP64(self.timestamp as u64), - typ: CRDTOperationType::Owned(OwnedOperation { - model: self.model, - items: serde_json::from_slice(&self.data).unwrap(), - }), - }) - } - } - - shared_operation::include!(shared_op_with_node { node }); - - impl TryInto for shared_op_with_node::Data { - type Error = (); - - fn try_into(self) -> Result { - let id = Uuid::from_slice(&self.id).map_err(|_| ())?; - let node = Uuid::from_slice(&self.node.pub_id).map_err(|_| ())?; - - Ok(CRDTOperation { - id, - node, - timestamp: NTP64(self.timestamp as u64), - typ: CRDTOperationType::Shared(SharedOperation { - record_id: serde_json::from_slice(&self.record_id).unwrap(), - model: self.model, - data: from_slice(&self.data).unwrap(), - }), - }) - } - } - - let owned = self - .db + let owned = db .owned_operation() .find_many(vec![]) - .include(owned_op_with_node::include()) + .include(owned_operation::include!({ node })) .exec() .await? .into_iter() - .map(TryInto::try_into); - let shared = self - .db + .flat_map(|op| { + Some(CRDTOperation { + id: Uuid::from_slice(&op.id).ok()?, + node: Uuid::from_slice(&op.node.pub_id).ok()?, + timestamp: NTP64(op.timestamp as u64), + typ: CRDTOperationType::Owned(OwnedOperation { + model: op.model, + items: serde_json::from_slice(&op.data).ok()?, + }), + }) + }); + + let shared = db .shared_operation() .find_many(vec![]) - .include(shared_op_with_node::include()) + .include(shared_operation::include!({ node })) .exec() .await? .into_iter() - .map(TryInto::try_into); + .flat_map(|op| { + Some(CRDTOperation { + id: Uuid::from_slice(&op.id).ok()?, + node: Uuid::from_slice(&op.node.pub_id).ok()?, + timestamp: NTP64(op.timestamp as u64), + typ: CRDTOperationType::Shared(SharedOperation { + record_id: serde_json::from_slice(&op.record_id).ok()?, + model: op.model, + data: serde_json::from_slice(&op.data).ok()?, + }), + }) + }); - let mut result: Vec = owned.chain(shared).flatten().collect(); + let mut result: Vec = owned.chain(shared).collect(); result.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));