[ENG-1674] fix request drop test and add write_ops test (#2252)

fix request drop test and add write_ops test
This commit is contained in:
Brendan Allan 2024-03-27 14:40:57 +08:00 committed by GitHub
parent b69733ffd9
commit 902ed24ffd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 207 additions and 200 deletions

View file

@ -265,7 +265,7 @@ impl ActorTypes for Actor {
#[cfg(test)]
mod test {
use std::sync::atomic::AtomicBool;
use std::{sync::atomic::AtomicBool, time::Duration};
use uhlc::HLCBuilder;
@ -292,7 +292,7 @@ mod test {
async fn messages_request_drop() -> Result<(), ()> {
let (ingest, _) = new_actor().await;
for _ in [(), ()] {
for _ in 0..10 {
let mut rx = ingest.req_rx.lock().await;
println!("lock acquired");
@ -305,28 +305,12 @@ mod test {
panic!("bruh")
};
println!("message received")
println!("message received");
// without this the test hangs, idk
tokio::time::sleep(Duration::from_millis(0)).await;
}
Ok(())
}
// /// If messages tx is dropped, actor should reset and assume no further messages
// /// will be sent
// #[tokio::test]
// async fn retrieve_wait() -> Result<(), ()> {
// let (ingest, _) = new_actor().await;
// for _ in [(), ()] {
// let mut rx = ingest.req_rx.lock().await;
// ingest.event_tx.send(Event::Notification).await.unwrap();
// let Some(Request::Messages { .. }) = rx.recv().await else {
// panic!("bruh")
// };
// }
// Ok(())
// }
}

View file

@ -1,203 +1,84 @@
mod mock_instance;
use sd_core_sync::*;
use sd_prisma::{prisma, prisma_sync};
use sd_sync::*;
use sd_utils::uuid_to_bytes;
use prisma_client_rust::chrono::Utc;
use std::sync::{atomic::AtomicBool, Arc};
use tokio::sync::broadcast;
use mock_instance::Instance;
use uuid::Uuid;
fn db_path(id: Uuid) -> String {
format!("/tmp/test-{id}.db")
}
#[derive(Clone)]
struct Instance {
id: Uuid,
db: Arc<prisma::PrismaClient>,
sync: Arc<sd_core_sync::Manager>,
}
impl Instance {
async fn new(id: Uuid) -> (Arc<Self>, broadcast::Receiver<SyncMessage>) {
let url = format!("file:{}", db_path(id));
println!("new -1: {url}");
let db = Arc::new(
prisma::PrismaClient::_builder()
.with_url(url.to_string())
.build()
.await
.unwrap(),
);
println!("new 0: {url}");
db._db_push().await.unwrap();
println!("new 1");
db.instance()
.create(
uuid_to_bytes(id),
vec![],
vec![],
Utc::now().into(),
Utc::now().into(),
vec![],
)
.exec()
.await
.unwrap();
println!("new 2");
let sync = sd_core_sync::Manager::new(
&db,
id,
&Arc::new(AtomicBool::new(true)),
Default::default(),
);
(
Arc::new(Self {
id,
db,
sync: Arc::new(sync.manager),
}),
sync.rx,
)
}
async fn teardown(&self) {
tokio::fs::remove_file(db_path(self.id)).await.unwrap();
}
async fn pair(left: &Self, right: &Self) {
left.db
.instance()
.create(
uuid_to_bytes(right.id),
vec![],
vec![],
Utc::now().into(),
Utc::now().into(),
vec![],
)
.exec()
.await
.unwrap();
right
.db
.instance()
.create(
uuid_to_bytes(left.id),
vec![],
vec![],
Utc::now().into(),
Utc::now().into(),
vec![],
)
.exec()
.await
.unwrap();
}
}
#[tokio::test]
async fn bruh() -> Result<(), Box<dyn std::error::Error>> {
let (instance1, mut sync_rx1) = Instance::new(Uuid::new_v4()).await;
let (instance2, mut sync_rx2) = Instance::new(Uuid::new_v4()).await;
Instance::pair(&instance1, &instance2).await;
let task_1 = tokio::spawn({
let _instance1 = instance1.clone();
let instance2 = instance2.clone();
async move {
while let Ok(msg) = sync_rx1.recv().await {
if matches!(msg, SyncMessage::Created) {
instance2
.sync
.ingest
.event_tx
.send(ingest::Event::Notification)
.await
.unwrap();
}
}
}
});
let task_2 = tokio::spawn({
let instance1 = instance1.clone();
let instance2 = instance2.clone();
async move {
while let Some(msg) = instance2.sync.ingest.req_rx.lock().await.recv().await {
match msg {
ingest::Request::Messages { timestamps, .. } => {
let messages = instance1
.sync
.get_ops(GetOpsArgs {
clocks: timestamps,
count: 100,
})
.await
.unwrap();
let ingest = &instance2.sync.ingest;
ingest
.event_tx
.send(ingest::Event::Messages(ingest::MessagesEvent {
messages,
has_more: false,
instance_id: instance1.id,
}))
.await
.unwrap();
}
ingest::Request::Ingested => {
instance2.sync.tx.send(SyncMessage::Ingested).ok();
}
_ => todo!(),
}
}
}
});
instance1
async fn write_test_location(instance: &Instance) -> Result<(), Box<dyn std::error::Error>> {
instance
.sync
.write_ops(&instance1.db, {
.write_ops(&instance.db, {
let id = Uuid::new_v4();
use prisma::location;
let (sync_ops, db_ops): (Vec<_>, Vec<_>) = [
sync_db_entry!("Location 0".to_string(), location::name),
sync_db_entry!("/User/Brendan/Documents".to_string(), location::path),
sync_db_entry!("Location 0".to_string(), prisma::location::name),
sync_db_entry!(
"/User/Brendan/Documents".to_string(),
prisma::location::path
),
]
.into_iter()
.unzip();
(
instance1.sync.shared_create(
instance.sync.shared_create(
prisma_sync::location::SyncId {
pub_id: uuid_to_bytes(id),
},
sync_ops,
),
instance1.db.location().create(uuid_to_bytes(id), db_ops),
instance.db.location().create(uuid_to_bytes(id), db_ops),
)
})
.await?;
assert!(matches!(sync_rx2.recv().await?, SyncMessage::Ingested));
Ok(())
}
#[tokio::test]
async fn writes_operations_and_rows_together() -> Result<(), Box<dyn std::error::Error>> {
let instance = Instance::new(Uuid::new_v4()).await;
write_test_location(&instance).await?;
let operations = instance
.db
.crdt_operation()
.find_many(vec![])
.exec()
.await?;
// 1 create, 2 update
assert_eq!(operations.len(), 3);
assert_eq!(operations[0].model, prisma::location::NAME);
let locations = instance.db.location().find_many(vec![]).exec().await?;
assert_eq!(locations.len(), 1);
let location = locations.first().unwrap();
assert_eq!(location.name, Some("Location 0".to_string()));
assert_eq!(location.path, Some("/User/Brendan/Documents".to_string()));
Ok(())
}
#[tokio::test]
async fn operations_send_and_ingest() -> Result<(), Box<dyn std::error::Error>> {
let instance1 = Instance::new(Uuid::new_v4()).await;
let instance2 = Instance::new(Uuid::new_v4()).await;
Instance::pair(&instance1, &instance2).await;
write_test_location(&instance1).await?;
assert!(matches!(
instance2.sync_rx.resubscribe().recv().await?,
SyncMessage::Ingested
));
let out = instance2
.sync
@ -212,8 +93,5 @@ async fn bruh() -> Result<(), Box<dyn std::error::Error>> {
instance1.teardown().await;
instance2.teardown().await;
task_1.abort();
task_2.abort();
Ok(())
}

View file

@ -0,0 +1,145 @@
use sd_core_sync::*;
use sd_prisma::prisma;
use sd_utils::uuid_to_bytes;
use prisma_client_rust::chrono::Utc;
use std::sync::{atomic::AtomicBool, Arc};
use tokio::sync::broadcast;
use uuid::Uuid;
fn db_path(id: Uuid) -> String {
format!("/tmp/test-{id}.db")
}
#[derive(Clone)]
pub struct Instance {
pub id: Uuid,
pub db: Arc<prisma::PrismaClient>,
pub sync: Arc<sd_core_sync::Manager>,
pub sync_rx: Arc<broadcast::Receiver<SyncMessage>>,
}
impl Instance {
pub async fn new(id: Uuid) -> Arc<Self> {
let url = format!("file:{}", db_path(id));
let db = Arc::new(
prisma::PrismaClient::_builder()
.with_url(url.to_string())
.build()
.await
.unwrap(),
);
db._db_push().await.unwrap();
db.instance()
.create(
uuid_to_bytes(id),
vec![],
vec![],
Utc::now().into(),
Utc::now().into(),
vec![],
)
.exec()
.await
.unwrap();
let sync = sd_core_sync::Manager::new(
&db,
id,
&Arc::new(AtomicBool::new(true)),
Default::default(),
);
Arc::new(Self {
id,
db,
sync: Arc::new(sync.manager),
sync_rx: Arc::new(sync.rx),
})
}
pub async fn teardown(&self) {
tokio::fs::remove_file(db_path(self.id)).await.unwrap();
}
pub async fn pair(left: &Arc<Self>, right: &Arc<Self>) {
async fn half(left: &Arc<Instance>, right: &Arc<Instance>) {
left.db
.instance()
.create(
uuid_to_bytes(right.id),
vec![],
vec![],
Utc::now().into(),
Utc::now().into(),
vec![],
)
.exec()
.await
.unwrap();
tokio::spawn({
let mut sync_rx_1 = left.sync_rx.resubscribe();
let instance2 = right.clone();
async move {
while let Ok(msg) = sync_rx_1.recv().await {
if matches!(msg, SyncMessage::Created) {
instance2
.sync
.ingest
.event_tx
.send(ingest::Event::Notification)
.await
.unwrap();
}
}
}
});
tokio::spawn({
let instance1 = left.clone();
let instance2 = right.clone();
async move {
while let Some(msg) = instance2.sync.ingest.req_rx.lock().await.recv().await {
match msg {
ingest::Request::Messages { timestamps, .. } => {
let messages = instance1
.sync
.get_ops(GetOpsArgs {
clocks: timestamps,
count: 100,
})
.await
.unwrap();
let ingest = &instance2.sync.ingest;
ingest
.event_tx
.send(ingest::Event::Messages(ingest::MessagesEvent {
messages,
has_more: false,
instance_id: instance1.id,
}))
.await
.unwrap();
}
ingest::Request::Ingested => {
instance2.sync.tx.send(SyncMessage::Ingested).ok();
}
ingest::Request::FinishedIngesting => {}
}
}
}
});
}
half(left, right).await;
half(right, left).await;
}
}