[ENG-1691] Sync status subscription (#2246)

* sync status subscription

* clippy
This commit is contained in:
Brendan Allan 2024-03-26 15:26:37 +08:00 committed by GitHub
parent 51acd0fd8e
commit 94ca18925d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 218 additions and 57 deletions

View file

@ -1,4 +1,7 @@
use std::{ops::Deref, sync::Arc};
use std::{
ops::Deref,
sync::{atomic::Ordering, Arc},
};
use sd_prisma::{
prisma::{crdt_operation, SortOrder},
@ -61,8 +64,14 @@ impl Actor {
async fn tick(mut self) -> Option<Self> {
let state = match self.state.take()? {
State::WaitingForNotification => {
self.shared.active.store(false, Ordering::Relaxed);
self.shared.active_notify.notify_waiters();
wait!(self.io.event_rx, Event::Notification);
self.shared.active.store(true, Ordering::Relaxed);
self.shared.active_notify.notify_waiters();
State::RetrievingMessages
}
State::RetrievingMessages => {
@ -270,6 +279,8 @@ mod test {
clock: HLCBuilder::new().with_id(instance.into()).build(),
timestamps: Default::default(),
emit_messages_flag: Arc::new(AtomicBool::new(true)),
active: Default::default(),
active_notify: Default::default(),
});
(Actor::spawn(shared.clone()), shared)

View file

@ -32,6 +32,8 @@ pub struct SharedState {
pub instance: uuid::Uuid,
pub timestamps: Timestamps,
pub clock: uhlc::HLC,
pub active: AtomicBool,
pub active_notify: tokio::sync::Notify,
}
#[must_use]

View file

@ -22,7 +22,7 @@ use uuid::Uuid;
pub struct Manager {
pub tx: broadcast::Sender<SyncMessage>,
pub ingest: ingest::Handler,
shared: Arc<SharedState>,
pub shared: Arc<SharedState>,
}
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
@ -54,6 +54,8 @@ impl Manager {
clock,
timestamps: Arc::new(RwLock::new(timestamps)),
emit_messages_flag: emit_messages_flag.clone(),
active: Default::default(),
active_notify: Default::default(),
});
let ingest = ingest::Actor::spawn(shared.clone());

View file

@ -145,7 +145,9 @@ mod library {
for instance in instances {
crate::cloud::sync::receive::upsert_instance(
&library,
library.id,
&library.db,
&library.sync,
&node.libraries,
instance.uuid,
instance.identity,

View file

@ -1,8 +1,6 @@
use std::sync::atomic::Ordering;
use sd_core_sync::GetOpsArgs;
use rspc::alpha::AlphaRouter;
use sd_core_sync::GetOpsArgs;
use std::sync::atomic::Ordering;
use crate::util::MaybeUndefined;
@ -77,4 +75,25 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.load(Ordering::Relaxed))
})
})
.procedure("active", {
R.with2(library())
.subscription(|(_, library), _: ()| async move {
async_stream::stream! {
let cloud_sync = &library.cloud.sync;
let sync = &library.sync.shared;
loop {
yield sync.active.load(Ordering::Relaxed)
|| cloud_sync.send_active.load(Ordering::Relaxed)
|| cloud_sync.receive_active.load(Ordering::Relaxed)
|| cloud_sync.ingest_active.load(Ordering::Relaxed);
tokio::select! {
_ = cloud_sync.notifier.notified() => {},
_ = sync.active_notify.notified() => {}
}
}
}
})
})
}

View file

@ -1 +1,33 @@
use std::sync::Arc;
use uuid::Uuid;
use crate::Node;
pub mod sync;
#[derive(Default)]
pub struct State {
pub sync: sync::State,
}
pub async fn start(
node: &Arc<Node>,
actors: &Arc<sd_actors::Actors>,
library_id: Uuid,
instance_uuid: Uuid,
sync: &Arc<sd_core_sync::Manager>,
db: &Arc<sd_prisma::prisma::PrismaClient>,
) -> State {
let sync = sync::declare_actors(
node,
actors,
library_id,
instance_uuid,
sync.clone(),
db.clone(),
)
.await;
State { sync }
}

View file

@ -1,4 +1,7 @@
use std::sync::Arc;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tokio::sync::Notify;
use tracing::debug;
@ -7,8 +10,16 @@ use crate::cloud::sync::err_break;
// Responsible for taking sync operations received from the cloud,
// and applying them to the local database via the sync system's ingest actor.
pub async fn run_actor(sync: Arc<sd_core_sync::Manager>, notify: Arc<Notify>) {
pub async fn run_actor(
sync: Arc<sd_core_sync::Manager>,
notify: Arc<Notify>,
state: Arc<AtomicBool>,
state_notify: Arc<Notify>,
) {
loop {
state.store(true, Ordering::Relaxed);
state_notify.notify_waiters();
{
let mut rx = sync.ingest.req_rx.lock().await;
@ -65,6 +76,9 @@ pub async fn run_actor(sync: Arc<sd_core_sync::Manager>, notify: Arc<Notify>) {
}
}
state.store(false, Ordering::Relaxed);
state_notify.notify_waiters();
notify.notified().await;
}
}

View file

@ -1,16 +1,35 @@
use sd_sync::*;
use std::sync::{atomic, Arc};
use std::sync::{
atomic::{self, AtomicBool},
Arc,
};
use tokio::sync::Notify;
use uuid::Uuid;
use crate::{library::Library, Node};
use crate::Node;
pub mod ingest;
pub mod receive;
pub mod send;
pub async fn declare_actors(library: &Arc<Library>, node: &Arc<Node>) {
#[derive(Default)]
pub struct State {
pub send_active: Arc<AtomicBool>,
pub receive_active: Arc<AtomicBool>,
pub ingest_active: Arc<AtomicBool>,
pub notifier: Arc<Notify>,
}
pub async fn declare_actors(
node: &Arc<Node>,
actors: &Arc<sd_actors::Actors>,
library_id: Uuid,
instance_uuid: Uuid,
sync: Arc<sd_core_sync::Manager>,
db: Arc<sd_prisma::prisma::PrismaClient>,
) -> State {
let ingest_notify = Arc::new(Notify::new());
let actors = &library.actors;
let state = State::default();
let autorun = node.cloud_sync_flag.load(atomic::Ordering::Relaxed);
@ -18,10 +37,12 @@ pub async fn declare_actors(library: &Arc<Library>, node: &Arc<Node>) {
.declare(
"Cloud Sync Sender",
{
let library = library.clone();
let sync = sync.clone();
let node = node.clone();
let active = state.send_active.clone();
let active_notifier = state.notifier.clone();
move || send::run_actor(library.id, library.sync.clone(), node.clone())
move || send::run_actor(library_id, sync, node, active, active_notifier)
},
autorun,
)
@ -31,21 +52,23 @@ pub async fn declare_actors(library: &Arc<Library>, node: &Arc<Node>) {
.declare(
"Cloud Sync Receiver",
{
let library = library.clone();
let sync = sync.clone();
let node = node.clone();
let ingest_notify = ingest_notify.clone();
let active_notifier = state.notifier.clone();
let active = state.receive_active.clone();
move || {
receive::run_actor(
library.clone(),
node.libraries.clone(),
library.db.clone(),
library.id,
library.instance_uuid,
library.sync.clone(),
node.clone(),
db.clone(),
library_id,
instance_uuid,
sync,
ingest_notify,
node.clone(),
node,
active,
active_notifier,
)
}
},
@ -57,12 +80,16 @@ pub async fn declare_actors(library: &Arc<Library>, node: &Arc<Node>) {
.declare(
"Cloud Sync Ingest",
{
let library = library.clone();
move || ingest::run_actor(library.sync.clone(), ingest_notify)
let active = state.ingest_active.clone();
let active_notifier = state.notifier.clone();
move || ingest::run_actor(sync.clone(), ingest_notify, active, active_notifier)
},
autorun,
)
.await;
state
}
macro_rules! err_break {

View file

@ -1,11 +1,7 @@
use crate::{
library::{Libraries, Library},
Node,
};
use crate::{library::Libraries, Node};
use super::{err_break, CompressedCRDTOperations};
use sd_cloud_api::RequestConfigProvider;
use sd_core_sync::NTP64;
use sd_p2p::RemoteIdentity;
use sd_prisma::prisma::{cloud_crdt_operation, instance, PrismaClient, SortOrder};
use sd_sync::CRDTOperation;
@ -14,7 +10,10 @@ use tracing::{debug, info};
use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
@ -28,17 +27,20 @@ use uuid::Uuid;
#[allow(clippy::too_many_arguments)]
pub async fn run_actor(
library: Arc<Library>,
libraries: Arc<Libraries>,
db: Arc<PrismaClient>,
library_id: Uuid,
instance_uuid: Uuid,
sync: Arc<sd_core_sync::Manager>,
cloud_api_config_provider: Arc<impl RequestConfigProvider>,
ingest_notify: Arc<Notify>,
node: Arc<Node>,
active: Arc<AtomicBool>,
active_notify: Arc<Notify>,
) {
loop {
active.store(true, Ordering::Relaxed);
active_notify.notify_waiters();
loop {
// We need to know the lastest operations we should be retrieving
let mut cloud_timestamps = {
@ -106,7 +108,7 @@ pub async fn run_actor(
let collections = err_break!(
sd_cloud_api::library::message_collections::get(
cloud_api_config_provider.get_request_config().await,
node.get_request_config().await,
library_id,
instance_uuid,
instance_timestamps,
@ -128,7 +130,7 @@ pub async fn run_actor(
None => {
let Some(fetched_library) = err_break!(
sd_cloud_api::library::get(
cloud_api_config_provider.get_request_config().await,
node.get_request_config().await,
library_id
)
.await
@ -157,7 +159,9 @@ pub async fn run_actor(
err_break!(
upsert_instance(
&library,
library_id,
&db,
&sync,
&libraries,
collection.instance_uuid,
instance.identity,
@ -200,6 +204,9 @@ pub async fn run_actor(
ingest_notify.notify_waiters();
}
active.store(false, Ordering::Relaxed);
active_notify.notify_waiters();
sleep(Duration::from_secs(60)).await;
}
}
@ -227,16 +234,16 @@ fn crdt_op_db(op: &CRDTOperation) -> cloud_crdt_operation::Create {
}
pub async fn upsert_instance(
library: &Arc<Library>,
library_id: Uuid,
db: &PrismaClient,
sync: &sd_core_sync::Manager,
libraries: &Libraries,
uuid: Uuid,
identity: RemoteIdentity,
node_id: Uuid,
metadata: HashMap<String, String>,
) -> prisma_client_rust::Result<()> {
library
.db
.instance()
db.instance()
.upsert(
instance::pub_id::equals(uuid_to_bytes(uuid)),
instance::create(
@ -254,16 +261,10 @@ pub async fn upsert_instance(
.exec()
.await?;
library
.sync
.timestamps
.write()
.await
.entry(uuid)
.or_default();
sync.timestamps.write().await.entry(uuid).or_default();
// Called again so the new instances are picked up
libraries.update_instances(library.clone()).await;
libraries.update_instances_by_id(library_id).await;
Ok(())
}

View file

@ -5,9 +5,15 @@ use sd_core_sync::{SyncMessage, NTP64};
use tracing::debug;
use uuid::Uuid;
use std::{sync::Arc, time::Duration};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tokio::time::sleep;
use tokio::{sync::Notify, time::sleep};
use super::err_break;
@ -17,8 +23,13 @@ pub async fn run_actor(
library_id: Uuid,
sync: Arc<sd_core_sync::Manager>,
cloud_api_config_provider: Arc<impl RequestConfigProvider>,
state: Arc<AtomicBool>,
state_notify: Arc<Notify>,
) {
loop {
state.store(true, Ordering::Relaxed);
state_notify.notify_waiters();
loop {
// all available instances will have a default timestamp from create_instance
let instances = sync
@ -109,6 +120,9 @@ pub async fn run_actor(
);
}
state.store(false, Ordering::Relaxed);
state_notify.notify_waiters();
{
// recreate subscription each time so that existing messages are dropped
let mut rx = sync.subscribe();

View file

@ -1,4 +1,6 @@
use crate::{api::CoreEvent, object::media::old_thumbnail::get_indexed_thumbnail_path, sync, Node};
use crate::{
api::CoreEvent, cloud, object::media::old_thumbnail::get_indexed_thumbnail_path, sync, Node,
};
use sd_file_path_helper::{file_path_to_full_path, IsolatedFilePathData};
use sd_p2p::Identity;
@ -35,6 +37,7 @@ pub struct Library {
/// db holds the database client for the current library.
pub db: Arc<PrismaClient>,
pub sync: Arc<sync::Manager>,
pub cloud: cloud::State,
/// key manager that provides encryption keys to functions that require them
// pub key_manager: Arc<KeyManager>,
/// p2p identity
@ -76,12 +79,15 @@ impl Library {
db: Arc<PrismaClient>,
node: &Arc<Node>,
sync: Arc<sync::Manager>,
cloud: cloud::State,
do_cloud_sync: broadcast::Sender<()>,
actors: Arc<sd_actors::Actors>,
) -> Arc<Self> {
Arc::new(Self {
id,
config: RwLock::new(config),
sync,
cloud,
db: db.clone(),
// key_manager,
identity,
@ -90,7 +96,7 @@ impl Library {
do_cloud_sync,
env: node.env.clone(),
event_bus_tx: node.event_bus.0.clone(),
actors: Default::default(),
actors,
})
}

View file

@ -489,6 +489,11 @@ impl Libraries {
})
.collect()
});
let sync_manager = Arc::new(sync.manager);
let actors = Default::default();
let cloud = crate::cloud::start(node, &actors, id, instance_id, &sync_manager, &db).await;
let (tx, mut rx) = broadcast::channel(10);
let library = Library::new(
@ -499,16 +504,16 @@ impl Libraries {
// key_manager,
db,
node,
Arc::new(sync.manager),
sync_manager,
cloud,
tx,
actors,
)
.await;
// This is an exception. Generally subscribe to this by `self.tx.subscribe`.
tokio::spawn(sync_rx_actor(library.clone(), node.clone(), sync.rx));
crate::cloud::sync::declare_actors(&library, node).await;
self.tx
.emit(LibraryManagerEvent::Load(library.clone()))
.await;
@ -611,7 +616,9 @@ impl Libraries {
for instance in lib.instances {
if let Err(err) = cloud::sync::receive::upsert_instance(
&library,
library.id,
&library.db,
&library.sync,
&node.libraries,
instance.uuid,
instance.identity,
@ -664,6 +671,17 @@ impl Libraries {
.emit(LibraryManagerEvent::InstancesModified(library))
.await;
}
pub async fn update_instances_by_id(&self, library_id: Uuid) {
let Some(library) = self.libraries.read().await.get(&library_id).cloned() else {
warn!("Failed to find instance to update by id");
return;
};
self.tx
.emit(LibraryManagerEvent::InstancesModified(library))
.await;
}
}
async fn sync_rx_actor(

View file

@ -1,6 +1,6 @@
import { Gear } from '@phosphor-icons/react';
import { useNavigate } from 'react-router';
import { JobManagerContextProvider, useClientContext, useDebugState } from '@sd/client';
import { JobManagerContextProvider, LibraryContextProvider, useClientContext, useDebugState, useLibrarySubscription } from '@sd/client';
import { Button, ButtonLink, Popover, Tooltip, usePopover } from '@sd/ui';
import { useKeysMatcher, useLocale, useShortcut } from '~/hooks';
import { usePlatform } from '~/util/Platform';
@ -8,6 +8,7 @@ import { usePlatform } from '~/util/Platform';
import DebugPopover from '../DebugPopover';
import { IsRunningJob, JobManager } from '../JobManager';
import FeedbackButton from './FeedbackButton';
import { useState } from 'react';
export default () => {
const { library } = useClientContext();
@ -44,6 +45,7 @@ export default () => {
)}
</>
)}
{library && <LibraryContextProvider library={library}><SyncStatusIndicator/></LibraryContextProvider>}
<div className="flex w-full items-center justify-between">
<div className="flex">
<ButtonLink
@ -95,3 +97,13 @@ export default () => {
</div>
);
};
function SyncStatusIndicator() {
const [syncing, setSyncing] = useState(false);
useLibrarySubscription(["sync.active"], {
onData: setSyncing
})
return null
}

View file

@ -136,6 +136,7 @@ export type Procedures = {
{ key: "notifications.listen", input: never, result: Notification } |
{ key: "p2p.events", input: never, result: P2PEvent } |
{ key: "search.ephemeralPaths", input: LibraryArgs<EphemeralPathSearchArgs>, result: EphemeralPathsResultItem } |
{ key: "sync.active", input: LibraryArgs<null>, result: boolean } |
{ key: "sync.newMessage", input: LibraryArgs<null>, result: null }
};