remove TryInto usage in sync manager

This commit is contained in:
Brendan Allan 2023-02-01 11:42:43 +08:00
parent 95ba2e1246
commit 58da1822e5

View file

@ -6,7 +6,7 @@ use crate::{
use sd_sync::*; use sd_sync::*;
use futures::future::join_all; use futures::future::join_all;
use serde_json::{from_slice, from_value, json, to_vec, Value}; use serde_json::{from_value, json, to_vec, Value};
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
sync::Arc, sync::Arc,
@ -147,69 +147,48 @@ impl SyncManager {
} }
pub async fn get_ops(&self) -> prisma_client_rust::Result<Vec<CRDTOperation>> { pub async fn get_ops(&self) -> prisma_client_rust::Result<Vec<CRDTOperation>> {
owned_operation::include!(owned_op_with_node { node }); let db = &self.db;
impl TryInto<CRDTOperation> for owned_op_with_node::Data { let owned = db
type Error = ();
fn try_into(self) -> Result<CRDTOperation, Self::Error> {
let id = Uuid::from_slice(&self.id).map_err(|_| ())?;
let node = Uuid::from_slice(&self.node.pub_id).map_err(|_| ())?;
Ok(CRDTOperation {
id,
node,
timestamp: NTP64(self.timestamp as u64),
typ: CRDTOperationType::Owned(OwnedOperation {
model: self.model,
items: serde_json::from_slice(&self.data).unwrap(),
}),
})
}
}
shared_operation::include!(shared_op_with_node { node });
impl TryInto<CRDTOperation> for shared_op_with_node::Data {
type Error = ();
fn try_into(self) -> Result<CRDTOperation, Self::Error> {
let id = Uuid::from_slice(&self.id).map_err(|_| ())?;
let node = Uuid::from_slice(&self.node.pub_id).map_err(|_| ())?;
Ok(CRDTOperation {
id,
node,
timestamp: NTP64(self.timestamp as u64),
typ: CRDTOperationType::Shared(SharedOperation {
record_id: serde_json::from_slice(&self.record_id).unwrap(),
model: self.model,
data: from_slice(&self.data).unwrap(),
}),
})
}
}
let owned = self
.db
.owned_operation() .owned_operation()
.find_many(vec![]) .find_many(vec![])
.include(owned_op_with_node::include()) .include(owned_operation::include!({ node }))
.exec() .exec()
.await? .await?
.into_iter() .into_iter()
.map(TryInto::try_into); .flat_map(|op| {
let shared = self Some(CRDTOperation {
.db id: Uuid::from_slice(&op.id).ok()?,
node: Uuid::from_slice(&op.node.pub_id).ok()?,
timestamp: NTP64(op.timestamp as u64),
typ: CRDTOperationType::Owned(OwnedOperation {
model: op.model,
items: serde_json::from_slice(&op.data).ok()?,
}),
})
});
let shared = db
.shared_operation() .shared_operation()
.find_many(vec![]) .find_many(vec![])
.include(shared_op_with_node::include()) .include(shared_operation::include!({ node }))
.exec() .exec()
.await? .await?
.into_iter() .into_iter()
.map(TryInto::try_into); .flat_map(|op| {
Some(CRDTOperation {
id: Uuid::from_slice(&op.id).ok()?,
node: Uuid::from_slice(&op.node.pub_id).ok()?,
timestamp: NTP64(op.timestamp as u64),
typ: CRDTOperationType::Shared(SharedOperation {
record_id: serde_json::from_slice(&op.record_id).ok()?,
model: op.model,
data: serde_json::from_slice(&op.data).ok()?,
}),
})
});
let mut result: Vec<CRDTOperation> = owned.chain(shared).flatten().collect(); let mut result: Vec<CRDTOperation> = owned.chain(shared).collect();
result.sort_by(|a, b| a.timestamp.cmp(&b.timestamp)); result.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));