ignore update after delete in sync (#2327)

This commit is contained in:
Brendan Allan 2024-04-17 12:57:45 +08:00 committed by GitHub
parent 3505f33448
commit a315dd632d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 166 additions and 74 deletions

View file

@ -7,7 +7,7 @@ use sd_prisma::{
prisma::{crdt_operation, SortOrder},
prisma_sync::ModelSyncData,
};
use sd_sync::CRDTOperation;
use sd_sync::{CRDTOperation, OperationKind};
use tokio::sync::{mpsc, oneshot, Mutex};
use tracing::debug;
use uhlc::{Timestamp, NTP64};
@ -77,17 +77,16 @@ impl Actor {
State::RetrievingMessages => {
let (tx, mut rx) = oneshot::channel::<()>();
let timestamps = self
.timestamps
.read()
.await
.iter()
.map(|(&k, &v)| (k, v))
.collect();
self.io
.send(Request::Messages {
timestamps: self
.timestamps
.read()
.await
.iter()
.map(|(&k, &v)| (k, v))
.collect(),
tx,
})
.send(Request::Messages { timestamps, tx })
.await
.ok();
@ -162,7 +161,12 @@ impl Actor {
}
// where the magic happens
async fn receive_crdt_operation(&mut self, op: CRDTOperation) {
async fn receive_crdt_operation(
&mut self,
op: CRDTOperation,
) -> prisma_client_rust::Result<()> {
let db = &self.db;
// first, we update the HLC's timestamp with the incoming one.
// this involves a drift check + sets the last time of the clock
self.clock
@ -176,65 +180,87 @@ impl Actor {
let op_instance = op.instance;
let op_timestamp = op.timestamp;
if !self.is_operation_old(&op).await {
// actually go and apply the operation in the db
self.apply_op(op).await.ok();
// resolve conflicts
// this can be outside the transaction as there's only ever one ingester
match op.kind() {
sd_sync::OperationKind::Create => {
let delete = db
.crdt_operation()
.find_first(vec![
crdt_operation::model::equals(op.model as i32),
crdt_operation::record_id::equals(
rmp_serde::to_vec(&op.record_id).unwrap(),
),
crdt_operation::kind::equals(OperationKind::Delete.to_string()),
])
.order_by(crdt_operation::timestamp::order(SortOrder::Desc))
.exec()
.await?;
// update the stored timestamp for this instance - will be derived from the crdt operations table on restart
self.timestamps.write().await.insert(
op_instance,
NTP64::max(timestamp.unwrap_or_default(), op_timestamp),
);
}
}
if delete.is_some() {
return Ok(());
}
}
sd_sync::OperationKind::Update(field) => {
let (create, update) = db
._batch((
db.crdt_operation()
.find_first(vec![
crdt_operation::model::equals(op.model as i32),
crdt_operation::record_id::equals(
rmp_serde::to_vec(&op.record_id).unwrap(),
),
crdt_operation::kind::equals(OperationKind::Create.to_string()),
])
.order_by(crdt_operation::timestamp::order(SortOrder::Desc)),
db.crdt_operation()
.find_first(vec![
crdt_operation::timestamp::gt(op.timestamp.as_u64() as i64),
crdt_operation::model::equals(op.model as i32),
crdt_operation::record_id::equals(
rmp_serde::to_vec(&op.record_id).unwrap(),
),
crdt_operation::kind::equals(
OperationKind::Update(field).to_string(),
),
])
.order_by(crdt_operation::timestamp::order(SortOrder::Desc)),
))
.await?;
async fn apply_op(&mut self, op: CRDTOperation) -> prisma_client_rust::Result<()> {
if create.is_none() || update.is_some() {
return Ok(());
}
}
_ => {}
};
// we don't want these writes to not apply together!
self.db
._transaction()
.with_timeout(30 * 1000)
.run(|db| async move {
// apply the operation to the actual record
let sync_data = ModelSyncData::from_op(op.clone());
sync_data.unwrap().exec(&db).await?;
ModelSyncData::from_op(op.clone())
.unwrap()
.exec(&db)
.await
.unwrap();
// write the operation to the operations table
write_crdt_op_to_db(&op, &db).await?;
Ok(())
write_crdt_op_to_db(&op, &db).await
})
.await?;
// update the stored timestamp for this instance - will be derived from the crdt operations table on restart
let new_ts = NTP64::max(timestamp.unwrap_or_default(), op_timestamp);
self.timestamps.write().await.insert(op_instance, new_ts);
// dbg!(self.instance, op_instance);
self.io.req_tx.send(Request::Ingested).await.ok();
Ok(())
}
// determines if an operation is old and shouldn't be applied
async fn is_operation_old(&mut self, op: &CRDTOperation) -> bool {
let db = &self.db;
let old_timestamp = {
let newer_op = db
.crdt_operation()
.find_first(vec![
crdt_operation::timestamp::gte(op.timestamp.as_u64() as i64),
crdt_operation::model::equals(op.model as i32),
crdt_operation::record_id::equals(serde_json::to_vec(&op.record_id).unwrap()),
crdt_operation::kind::equals(op.kind().to_string()),
])
.order_by(crdt_operation::timestamp::order(SortOrder::Desc))
.exec()
.await
.unwrap();
newer_op.map(|newer_op| newer_op.timestamp)
};
old_timestamp
.map(|old| old != op.timestamp.as_u64() as i64)
.unwrap_or_default()
}
}
impl Deref for Actor {
@ -295,18 +321,12 @@ mod test {
for _ in 0..10 {
let mut rx = ingest.req_rx.lock().await;
println!("lock acquired");
ingest.event_tx.send(Event::Notification).await.unwrap();
println!("notificaton sent");
let Some(Request::Messages { .. }) = rx.recv().await else {
panic!("bruh")
};
println!("message received");
// without this the test hangs, idk
tokio::time::sleep(Duration::from_millis(0)).await;
}

View file

@ -86,24 +86,30 @@ impl Manager {
pub async fn write_ops<'item, I: prisma_client_rust::BatchItem<'item>>(
&self,
tx: &PrismaClient,
(mut _ops, queries): (Vec<CRDTOperation>, I),
(mut ops, queries): (Vec<CRDTOperation>, I),
) -> prisma_client_rust::Result<<I as prisma_client_rust::BatchItemParent>::ReturnValue> {
let ret = if self.emit_messages_flag.load(atomic::Ordering::Relaxed) {
let lock = self.timestamp_lock.acquire().await;
_ops.iter_mut().for_each(|op| {
ops.iter_mut().for_each(|op| {
op.timestamp = *self.get_clock().new_timestamp().get_time();
});
let (res, _) = tx
._batch((
queries,
_ops.iter()
ops.iter()
.map(|op| crdt_op_db(op).to_query(tx))
.collect::<Vec<_>>(),
))
.await?;
self.shared
.timestamps
.write()
.await
.insert(self.instance, ops.last().unwrap().timestamp);
self.tx.send(SyncMessage::Created).ok();
drop(lock);
@ -139,6 +145,12 @@ impl Manager {
tx._batch(vec![query]).await?.remove(0)
};
self.shared
.timestamps
.write()
.await
.insert(self.instance, op.timestamp);
Ok(ret)
}

View file

@ -4,13 +4,15 @@ use sd_core_sync::*;
use sd_prisma::{prisma, prisma_sync};
use sd_sync::*;
use sd_utils::uuid_to_bytes;
use sd_utils::{msgpack, uuid_to_bytes};
use mock_instance::Instance;
use uuid::Uuid;
async fn write_test_location(instance: &Instance) -> Result<(), Box<dyn std::error::Error>> {
instance
async fn write_test_location(
instance: &Instance,
) -> Result<prisma::location::Data, Box<dyn std::error::Error>> {
Ok(instance
.sync
.write_ops(&instance.db, {
let id = Uuid::new_v4();
@ -35,9 +37,7 @@ async fn write_test_location(instance: &Instance) -> Result<(), Box<dyn std::err
instance.db.location().create(uuid_to_bytes(id), db_ops),
)
})
.await?;
Ok(())
.await?)
}
#[tokio::test]
@ -96,3 +96,63 @@ async fn operations_send_and_ingest() -> Result<(), Box<dyn std::error::Error>>
Ok(())
}
#[tokio::test]
async fn no_update_after_delete() -> Result<(), Box<dyn std::error::Error>> {
let instance1 = Instance::new(Uuid::new_v4()).await;
let instance2 = Instance::new(Uuid::new_v4()).await;
Instance::pair(&instance1, &instance2).await;
let location = write_test_location(&instance1).await?;
assert!(matches!(
instance2.sync_rx.resubscribe().recv().await?,
SyncMessage::Ingested
));
instance2
.sync
.write_op(
&instance2.db,
instance2.sync.shared_delete(prisma_sync::location::SyncId {
pub_id: location.pub_id.clone(),
}),
instance2.db.location().delete_many(vec![]),
)
.await?;
assert!(matches!(
instance1.sync_rx.resubscribe().recv().await?,
SyncMessage::Ingested
));
instance1
.sync
.write_op(
&instance1.db,
instance1.sync.shared_update(
prisma_sync::location::SyncId {
pub_id: location.pub_id.clone(),
},
"name",
msgpack!("New Location"),
),
instance1.db.location().find_many(vec![]),
)
.await
.ok();
// one spare update operation that actually gets ignored by instance 2
assert_eq!(instance1.db.crdt_operation().count(vec![]).exec().await?, 5);
assert_eq!(instance2.db.crdt_operation().count(vec![]).exec().await?, 4);
assert_eq!(instance1.db.location().count(vec![]).exec().await?, 0);
// the whole point of the test - the update (which is ingested as an upsert) should be ignored
assert_eq!(instance2.db.location().count(vec![]).exec().await?, 0);
instance1.teardown().await;
instance2.teardown().await;
Ok(())
}

View file

@ -40,7 +40,7 @@ pub fn r#enum(models: Vec<ModelWithSyncType>) -> TokenStream {
ModelSyncType::Shared { id, .. } => {
let (get_id, equals_value, id_name_snake, create_id) = match id.refine() {
RefinedFieldWalker::Relation(rel) => {
let scalar_field = rel.referenced_fields().unwrap().next().unwrap();
let scalar_field = rel.fields().unwrap().next().unwrap();
let id_name_snake = snake_ident(scalar_field.name());
let field_name_snake = snake_ident(rel.name());
let opposite_model_name_snake =

View file

@ -66,9 +66,9 @@ impl CRDTOperation {
impl Debug for CRDTOperation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CRDTOperation")
.field("instance", &self.instance.to_string())
.field("timestamp", &self.timestamp.to_string())
// .field("typ", &self.typ)
.field("data", &self.data)
.field("model", &self.model)
.field("record_id", &self.record_id.to_string())
.finish()
}
}