Add some documentation for the sync ingester (#2028)

This commit is contained in:
Brendan Allan 2024-02-01 12:58:48 +08:00 committed by GitHub
parent 01ae0d01fa
commit d1027a51fa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -116,28 +116,30 @@ impl Actor {
}
}
// where the magic happens
async fn receive_crdt_operation(&mut self, op: CRDTOperation) {
// first, we update the HLC's timestamp with the incoming one.
// this involves a drift check + sets the last time of the clock
self.clock
.update_with_timestamp(&Timestamp::new(op.timestamp, op.instance.into()))
.ok();
.expect("timestamp has too much drift!");
let mut timestamp = {
let mut clocks = self.timestamps.write().await;
*clocks.entry(op.instance).or_insert_with(|| op.timestamp)
};
if timestamp < op.timestamp {
timestamp = op.timestamp;
}
// read the timestamp for the operation's instance, or insert one if it doesn't exist
let timestamp = self.timestamps.write().await.get(&op.instance).cloned();
// copy some fields bc rust ownership
let op_instance = op.instance;
let op_timestamp = op.timestamp;
let is_old = self.compare_message(&op).await;
if !is_old {
if !self.is_operation_old(&op).await {
// actually go and apply the operation in the db
self.apply_op(op).await.ok();
self.timestamps.write().await.insert(op_instance, timestamp);
// update the stored timestamp for this instance - will be derived from the crdt operations table on restart
self.timestamps.write().await.insert(
op_instance,
NTP64::max(timestamp.unwrap_or_default(), op_timestamp),
);
}
}
@ -145,11 +147,13 @@ impl Actor {
self.db
._transaction()
.run(|db| async move {
// apply the operation to the actual record
ModelSyncData::from_op(op.clone())
.unwrap()
.exec(&db)
.await?;
// write the operation to the operations table
write_crdt_op_to_db(&op, &db).await?;
Ok(())
@ -161,7 +165,8 @@ impl Actor {
Ok(())
}
async fn compare_message(&mut self, op: &CRDTOperation) -> bool {
// determines if an operation is old and shouldn't be applied
async fn is_operation_old(&mut self, op: &CRDTOperation) -> bool {
let db = &self.db;
let old_timestamp = {