[ENG-1740] Add sempahore for generating sync operation timestamps (#2335)

add sempahore for generating sync operation timestamps
This commit is contained in:
Brendan Allan 2024-04-16 16:14:08 +08:00 committed by GitHub
parent 20b8a2b93b
commit b99a1adfca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 27 additions and 3 deletions

View file

@ -15,6 +15,8 @@ use crate::crdt_op_unchecked_db;
/// Takes all the syncable data in the database and generates CRDTOperations for it. /// Takes all the syncable data in the database and generates CRDTOperations for it.
/// This is a requirement before the library can sync. /// This is a requirement before the library can sync.
pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, instance_id: i32) { pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, instance_id: i32) {
let lock = sync.timestamp_lock.acquire().await;
db._transaction() db._transaction()
.with_timeout(9999999999) .with_timeout(9999999999)
.run(|db| async move { .run(|db| async move {
@ -412,6 +414,8 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
}) })
.await .await
.unwrap(); .unwrap();
drop(lock);
} }
async fn paginate< async fn paginate<

View file

@ -24,6 +24,7 @@ pub struct Manager {
pub tx: broadcast::Sender<SyncMessage>, pub tx: broadcast::Sender<SyncMessage>,
pub ingest: ingest::Handler, pub ingest: ingest::Handler,
pub shared: Arc<SharedState>, pub shared: Arc<SharedState>,
pub timestamp_lock: tokio::sync::Semaphore,
} }
impl fmt::Debug for Manager { impl fmt::Debug for Manager {
@ -68,7 +69,12 @@ impl Manager {
let ingest = ingest::Actor::spawn(shared.clone()); let ingest = ingest::Actor::spawn(shared.clone());
New { New {
manager: Self { tx, ingest, shared }, manager: Self {
tx,
ingest,
shared,
timestamp_lock: tokio::sync::Semaphore::new(1),
},
rx, rx,
} }
} }
@ -80,9 +86,15 @@ impl Manager {
pub async fn write_ops<'item, I: prisma_client_rust::BatchItem<'item>>( pub async fn write_ops<'item, I: prisma_client_rust::BatchItem<'item>>(
&self, &self,
tx: &PrismaClient, tx: &PrismaClient,
(_ops, queries): (Vec<CRDTOperation>, I), (mut _ops, queries): (Vec<CRDTOperation>, I),
) -> prisma_client_rust::Result<<I as prisma_client_rust::BatchItemParent>::ReturnValue> { ) -> prisma_client_rust::Result<<I as prisma_client_rust::BatchItemParent>::ReturnValue> {
let ret = if self.emit_messages_flag.load(atomic::Ordering::Relaxed) { let ret = if self.emit_messages_flag.load(atomic::Ordering::Relaxed) {
let lock = self.timestamp_lock.acquire().await;
_ops.iter_mut().for_each(|op| {
op.timestamp = *self.get_clock().new_timestamp().get_time();
});
let (res, _) = tx let (res, _) = tx
._batch(( ._batch((
queries, queries,
@ -94,6 +106,8 @@ impl Manager {
self.tx.send(SyncMessage::Created).ok(); self.tx.send(SyncMessage::Created).ok();
drop(lock);
res res
} else { } else {
tx._batch([queries]).await?.remove(0) tx._batch([queries]).await?.remove(0)
@ -106,14 +120,20 @@ impl Manager {
pub async fn write_op<'item, Q: prisma_client_rust::BatchItem<'item>>( pub async fn write_op<'item, Q: prisma_client_rust::BatchItem<'item>>(
&self, &self,
tx: &PrismaClient, tx: &PrismaClient,
op: CRDTOperation, mut op: CRDTOperation,
query: Q, query: Q,
) -> prisma_client_rust::Result<<Q as prisma_client_rust::BatchItemParent>::ReturnValue> { ) -> prisma_client_rust::Result<<Q as prisma_client_rust::BatchItemParent>::ReturnValue> {
let ret = if self.emit_messages_flag.load(atomic::Ordering::Relaxed) { let ret = if self.emit_messages_flag.load(atomic::Ordering::Relaxed) {
let lock = self.timestamp_lock.acquire().await;
op.timestamp = *self.get_clock().new_timestamp().get_time();
let ret = tx._batch((crdt_op_db(&op).to_query(tx), query)).await?.1; let ret = tx._batch((crdt_op_db(&op).to_query(tx), query)).await?.1;
self.tx.send(SyncMessage::Created).ok(); self.tx.send(SyncMessage::Created).ok();
drop(lock);
ret ret
} else { } else {
tx._batch(vec![query]).await?.remove(0) tx._batch(vec![query]).await?.remove(0)