From 3cabc9c3a9c29c91d24116f16c83fb0dbc815058 Mon Sep 17 00:00:00 2001 From: Brendan Allan Date: Thu, 14 Dec 2023 01:49:22 +0800 Subject: [PATCH] Basic actor manager (#1888) * basic declared actor manager * put actors in separate file * clippy * hopefully clean up some clippy warnings --------- Co-authored-by: jake <77554505+brxken128@users.noreply.github.com> --- core/crates/sync/src/ingest.rs | 7 +- core/crates/sync/src/manager.rs | 1 + core/src/api/cloud.rs | 7 +- core/src/api/libraries.rs | 34 +++++ core/src/api/p2p.rs | 24 ++-- core/src/api/volumes.rs | 3 +- core/src/cloud/sync/ingest.rs | 75 ++++++----- core/src/cloud/sync/mod.rs | 36 +++-- core/src/cloud/sync/receive.rs | 67 +++++----- core/src/cloud/sync/send.rs | 22 +--- core/src/library/actors.rs | 123 ++++++++++++++++++ core/src/library/library.rs | 5 +- core/src/library/manager/mod.rs | 9 +- core/src/library/mod.rs | 2 + crates/cloud-api/src/lib.rs | 8 +- crates/crypto/src/keys/keyring/linux.rs | 4 +- .../$libraryId/Layout/Sidebar/Contents.tsx | 30 +++-- interface/app/$libraryId/debug/actors.tsx | 68 ++++++++++ .../app/$libraryId/{ => debug}/cloud.tsx | 0 interface/app/$libraryId/debug/index.ts | 9 +- interface/app/$libraryId/{ => debug}/sync.tsx | 0 interface/app/$libraryId/index.tsx | 12 +- interface/app/onboarding/new-library.tsx | 2 +- packages/client/src/core.ts | 3 + packages/client/src/hooks/useFeatureFlag.tsx | 2 +- 25 files changed, 402 insertions(+), 151 deletions(-) create mode 100644 core/src/library/actors.rs create mode 100644 interface/app/$libraryId/debug/actors.tsx rename interface/app/$libraryId/{ => debug}/cloud.tsx (100%) rename interface/app/$libraryId/{ => debug}/sync.tsx (100%) diff --git a/core/crates/sync/src/ingest.rs b/core/crates/sync/src/ingest.rs index 7cf66408d..a09a64b73 100644 --- a/core/crates/sync/src/ingest.rs +++ b/core/crates/sync/src/ingest.rs @@ -257,13 +257,10 @@ async fn write_crdt_op_to_db( ) -> Result<(), prisma_client_rust::QueryError> { match &op.typ { CRDTOperationType::Shared(shared_op) => { - shared_op_db(&op, shared_op).to_query(&db).exec().await?; + shared_op_db(op, shared_op).to_query(db).exec().await?; } CRDTOperationType::Relation(relation_op) => { - relation_op_db(&op, relation_op) - .to_query(&db) - .exec() - .await?; + relation_op_db(op, relation_op).to_query(db).exec().await?; } } diff --git a/core/crates/sync/src/manager.rs b/core/crates/sync/src/manager.rs index 4b27edf00..4be17fde9 100644 --- a/core/crates/sync/src/manager.rs +++ b/core/crates/sync/src/manager.rs @@ -38,6 +38,7 @@ pub struct New { } impl Manager { + #[allow(clippy::new_ret_no_self)] pub fn new( db: &Arc, instance: Uuid, diff --git a/core/src/api/cloud.rs b/core/src/api/cloud.rs index 31578417b..d942c0701 100644 --- a/core/src/api/cloud.rs +++ b/core/src/api/cloud.rs @@ -59,7 +59,12 @@ mod library { .libraries .create_with_uuid( library_id, - LibraryName::new(cloud_library.name).unwrap(), + LibraryName::new(cloud_library.name).map_err(|e| { + rspc::Error::new( + rspc::ErrorCode::InternalServerError, + e.to_string(), + ) + })?, None, false, None, diff --git a/core/src/api/libraries.rs b/core/src/api/libraries.rs index 9394ea626..7064dcf33 100644 --- a/core/src/api/libraries.rs +++ b/core/src/api/libraries.rs @@ -323,4 +323,38 @@ pub(crate) fn mount() -> AlphaRouter { node.libraries.delete(&id).await.map_err(Into::into) }), ) + .procedure( + "actors", + R.with2(library()).subscription(|(_, library), _: ()| { + let mut rx = library.actors.invalidate_rx.resubscribe(); + + async_stream::stream! { + let actors = library.actors.get_state().await; + yield actors; + + while let Ok(()) = rx.recv().await { + let actors = library.actors.get_state().await; + yield actors; + } + } + }), + ) + .procedure( + "startActor", + R.with2(library()) + .mutation(|(_, library), name: String| async move { + library.actors.start(&name).await; + + Ok(()) + }), + ) + .procedure( + "stopActor", + R.with2(library()) + .mutation(|(_, library), name: String| async move { + library.actors.stop(&name).await; + + Ok(()) + }), + ) } diff --git a/core/src/api/p2p.rs b/core/src/api/p2p.rs index 28bad7051..8f0cf0774 100644 --- a/core/src/api/p2p.rs +++ b/core/src/api/p2p.rs @@ -26,17 +26,12 @@ pub(crate) fn mount() -> AlphaRouter { } // TODO: Don't block subscription start - for identity in node - .p2p - .manager - .get_connected_peers() - .await - .map_err(|_err| { - rspc::Error::new( - ErrorCode::InternalServerError, - "todo: error getting connected peers".into(), - ) - })? { + for identity in node.p2p.manager.get_connected_peers().await.map_err(|_| { + rspc::Error::new( + ErrorCode::InternalServerError, + "todo: error getting connected peers".into(), + ) + })? { queued.push(P2PEvent::ConnectedPeer { identity }); } @@ -88,7 +83,11 @@ pub(crate) fn mount() -> AlphaRouter { }) }) .procedure("cancelSpacedrop", { - R.mutation(|node, id: Uuid| async move { Ok(node.p2p.cancel_spacedrop(id).await) }) + R.mutation(|node, id: Uuid| async move { + node.p2p.cancel_spacedrop(id).await; + + Ok(()) + }) }) .procedure("pair", { R.mutation(|node, id: RemoteIdentity| async move { @@ -98,6 +97,7 @@ pub(crate) fn mount() -> AlphaRouter { .procedure("pairingResponse", { R.mutation(|node, (pairing_id, decision): (u16, PairingDecision)| { node.p2p.pairing.decision(pairing_id, decision); + Ok(()) }) }) diff --git a/core/src/api/volumes.rs b/core/src/api/volumes.rs index cf4aee31d..550bc3f01 100644 --- a/core/src/api/volumes.rs +++ b/core/src/api/volumes.rs @@ -15,8 +15,7 @@ pub(crate) fn mount() -> AlphaRouter { blake3::hash( &i.mount_points .iter() - .map(|mp| mp.as_os_str().to_string_lossy().as_bytes().to_vec()) - .flatten() + .flat_map(|mp| mp.as_os_str().to_string_lossy().as_bytes().to_vec()) .collect::>(), ) .to_hex() diff --git a/core/src/cloud/sync/ingest.rs b/core/src/cloud/sync/ingest.rs index 046878b23..49ed997ad 100644 --- a/core/src/cloud/sync/ingest.rs +++ b/core/src/cloud/sync/ingest.rs @@ -1,47 +1,54 @@ +use crate::cloud::sync::err_return; + use super::Library; use std::sync::Arc; use tokio::sync::Notify; -pub async fn run_actor(library: Arc, notify: Arc) { +pub async fn run_actor((library, notify): (Arc, Arc)) { let Library { sync, .. } = library.as_ref(); loop { - let mut rx = sync.ingest.req_rx.lock().await; + { + let mut rx = sync.ingest.req_rx.lock().await; - sync.ingest - .event_tx - .send(sd_core_sync::Event::Notification) - .await - .unwrap(); - - use crate::sync::ingest::*; - - while let Some(req) = rx.recv().await { - const OPS_PER_REQUEST: u32 = 1000; - - let timestamps = match req { - Request::FinishedIngesting => break, - Request::Messages { timestamps } => timestamps, - _ => continue, - }; - - let ops = sync - .get_cloud_ops(crate::sync::GetOpsArgs { - clocks: timestamps, - count: OPS_PER_REQUEST, - }) - .await - .unwrap(); - - sync.ingest + if sync + .ingest .event_tx - .send(sd_core_sync::Event::Messages(MessagesEvent { - instance_id: library.sync.instance, - has_more: ops.len() == 1000, - messages: ops, - })) + .send(sd_core_sync::Event::Notification) .await - .unwrap(); + .is_ok() + { + use crate::sync::ingest::*; + + while let Some(req) = rx.recv().await { + const OPS_PER_REQUEST: u32 = 1000; + + let timestamps = match req { + Request::FinishedIngesting => break, + Request::Messages { timestamps } => timestamps, + _ => continue, + }; + + let ops = err_return!( + sync.get_cloud_ops(crate::sync::GetOpsArgs { + clocks: timestamps, + count: OPS_PER_REQUEST, + }) + .await + ); + + err_return!( + sync.ingest + .event_tx + .send(sd_core_sync::Event::Messages(MessagesEvent { + instance_id: library.sync.instance, + has_more: ops.len() == 1000, + messages: ops, + })) + .await + ); + } + } } notify.notified().await; diff --git a/core/src/cloud/sync/mod.rs b/core/src/cloud/sync/mod.rs index 58fab6eed..d14ed8b3f 100644 --- a/core/src/cloud/sync/mod.rs +++ b/core/src/cloud/sync/mod.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{atomic, Arc}; use crate::{library::Library, Node}; @@ -6,16 +6,34 @@ mod ingest; mod receive; mod send; -pub fn spawn_actors(library: &Arc, node: &Arc) { +pub async fn declare_actors(library: &Arc, node: &Arc) { let ingest_notify = Arc::new(Notify::new()); + let actors = &library.actors; - tokio::spawn(send::run_actor(library.clone(), node.clone())); - tokio::spawn(receive::run_actor( - library.clone(), - node.clone(), - ingest_notify.clone(), - )); - tokio::spawn(ingest::run_actor(library.clone(), ingest_notify)); + let autorun = node.cloud_sync_flag.load(atomic::Ordering::Relaxed); + + let args = (library.clone(), node.clone()); + actors + .declare("Cloud Sync Sender", move || send::run_actor(args), autorun) + .await; + + let args = (library.clone(), node.clone(), ingest_notify.clone()); + actors + .declare( + "Cloud Sync Receiver", + move || receive::run_actor(args), + autorun, + ) + .await; + + let args = (library.clone(), ingest_notify); + actors + .declare( + "Cloud Sync Ingest", + move || ingest::run_actor(args), + autorun, + ) + .await; } macro_rules! err_break { diff --git a/core/src/cloud/sync/receive.rs b/core/src/cloud/sync/receive.rs index b5f442b2a..f2339f7be 100644 --- a/core/src/cloud/sync/receive.rs +++ b/core/src/cloud/sync/receive.rs @@ -14,14 +14,15 @@ use sd_sync::*; use sd_utils::{from_bytes_to_uuid, uuid_to_bytes}; use serde::Deserialize; use serde_json::{json, to_vec}; -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + collections::{hash_map::Entry, HashMap}, + sync::Arc, + time::Duration, +}; use tokio::{sync::Notify, time::sleep}; -use tracing::debug; use uuid::Uuid; -pub async fn run_actor(library: Arc, node: Arc, ingest_notify: Arc) { - debug!("receive actor running"); - +pub async fn run_actor((library, node, ingest_notify): (Arc, Arc, Arc)) { let db = &library.db; let api_url = &library.env.api_url; let library_id = library.id; @@ -45,7 +46,9 @@ pub async fn run_actor(library: Arc, node: Arc, ingest_notify: Ar .zip(timestamps.keys()) .map(|(d, id)| { let cloud_timestamp = NTP64(d.map(|d| d.timestamp).unwrap_or_default() as u64); - let sync_timestamp = *timestamps.get(id).unwrap(); + let sync_timestamp = *timestamps + .get(id) + .expect("unable to find matching timestamp"); let max_timestamp = Ord::max(cloud_timestamp, sync_timestamp); @@ -85,28 +88,27 @@ pub async fn run_actor(library: Arc, node: Arc, ingest_notify: Ar } { - let collections = err_break!( - err_break!( - node.authed_api_request( - node.http - .post(&format!( - "{api_url}/api/v1/libraries/{library_id}/messageCollections/get" - )) - .json(&json!({ - "instanceUuid": library.instance_uuid, - "timestamps": instances - })), - ) - .await + let collections = node + .authed_api_request( + node.http + .post(&format!( + "{api_url}/api/v1/libraries/{library_id}/messageCollections/get" + )) + .json(&json!({ + "instanceUuid": library.instance_uuid, + "timestamps": instances + })), ) + .await + .expect("couldn't get response") .json::>() .await - ); + .expect("couldn't deserialize response"); let mut cloud_library_data: Option> = None; for collection in collections { - if !cloud_timestamps.contains_key(&collection.instance_uuid) { + if let Entry::Vacant(e) = cloud_timestamps.entry(collection.instance_uuid) { let fetched_library = match &cloud_library_data { None => { let Some(fetched_library) = err_break!( @@ -122,7 +124,7 @@ pub async fn run_actor(library: Arc, node: Arc, ingest_notify: Ar cloud_library_data .insert(Some(fetched_library)) .as_ref() - .unwrap() + .expect("error inserting fetched library") } Some(None) => { break; @@ -140,14 +142,14 @@ pub async fn run_actor(library: Arc, node: Arc, ingest_notify: Ar err_break!( create_instance( - &db, + db, collection.instance_uuid, err_break!(BASE64_STANDARD.decode(instance.identity.clone())) ) .await ); - cloud_timestamps.insert(collection.instance_uuid, NTP64(0)); + e.insert(NTP64(0)); } err_break!( @@ -160,7 +162,8 @@ pub async fn run_actor(library: Arc, node: Arc, ingest_notify: Ar .await ); - let collection_timestamp = NTP64(collection.end_time.parse().unwrap()); + let collection_timestamp = + NTP64(collection.end_time.parse().expect("unable to parse time")); let timestamp = cloud_timestamps .entry(collection.instance_uuid) @@ -184,10 +187,10 @@ async fn write_cloud_ops_to_db( ) -> Result<(), prisma_client_rust::QueryError> { let (shared, relation): (Vec<_>, Vec<_>) = ops.into_iter().partition_map(|op| match &op.typ { CRDTOperationType::Shared(shared_op) => { - Either::Left(shared_op_db(&op, &shared_op).to_query(&db)) + Either::Left(shared_op_db(&op, shared_op).to_query(db)) } CRDTOperationType::Relation(relation_op) => { - Either::Right(relation_op_db(&op, &relation_op).to_query(&db)) + Either::Right(relation_op_db(&op, relation_op).to_query(db)) } }); @@ -202,9 +205,9 @@ fn shared_op_db(op: &CRDTOperation, shared_op: &SharedOperation) -> cloud_shared timestamp: op.timestamp.0 as i64, instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()), kind: shared_op.kind().to_string(), - data: to_vec(&shared_op.data).unwrap(), + data: to_vec(&shared_op.data).expect("unable to serialize data"), model: shared_op.model.to_string(), - record_id: to_vec(&shared_op.record_id).unwrap(), + record_id: to_vec(&shared_op.record_id).expect("unable to serialize record id"), _params: vec![], } } @@ -218,10 +221,10 @@ fn relation_op_db( timestamp: op.timestamp.0 as i64, instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()), kind: relation_op.kind().to_string(), - data: to_vec(&relation_op.data).unwrap(), + data: to_vec(&relation_op.data).expect("unable to serialize data"), relation: relation_op.relation.to_string(), - item_id: to_vec(&relation_op.relation_item).unwrap(), - group_id: to_vec(&relation_op.relation_group).unwrap(), + item_id: to_vec(&relation_op.relation_item).expect("unable to serialize item id"), + group_id: to_vec(&relation_op.relation_group).expect("unable to serialize group id"), _params: vec![], } } diff --git a/core/src/cloud/sync/send.rs b/core/src/cloud/sync/send.rs index b1b77c5b5..cdc32d608 100644 --- a/core/src/cloud/sync/send.rs +++ b/core/src/cloud/sync/send.rs @@ -9,16 +9,12 @@ use std::{sync::Arc, time::Duration}; use tokio::time::sleep; use uuid::Uuid; -pub async fn run_actor(library: Arc, node: Arc) { +pub async fn run_actor((library, node): (Arc, Arc)) { let db = &library.db; let api_url = &library.env.api_url; let library_id = library.id; loop { - println!("send_actor run"); - - println!("send_actor sending"); - loop { let instances = err_break!( db.instance() @@ -55,8 +51,6 @@ pub async fn run_actor(library: Arc, node: Arc) { .await ); - println!("Add Requests: {req_adds:#?}"); - let mut instances = vec![]; for req_add in req_adds { @@ -72,14 +66,14 @@ pub async fn run_actor(library: Arc, node: Arc) { .from_time .unwrap_or_else(|| "0".to_string()) .parse() - .unwrap(), + .expect("couldn't parse ntp64 value"), ), )], }) .await ); - if ops.len() == 0 { + if ops.is_empty() { continue; } @@ -100,11 +94,11 @@ pub async fn run_actor(library: Arc, node: Arc) { "Number of messages: {}", instances .iter() - .map(|i| i["contents"].as_array().unwrap().len()) + .map(|i| i["contents"].as_array().expect("no contents found").len()) .sum::() ); - if instances.len() == 0 { + if instances.is_empty() { break; } @@ -115,7 +109,7 @@ pub async fn run_actor(library: Arc, node: Arc) { // from_time: String, } - let responses = err_break!( + let _responses = err_break!( err_break!( node.authed_api_request( node.http @@ -129,8 +123,6 @@ pub async fn run_actor(library: Arc, node: Arc) { .json::>() .await ); - - println!("DoAdd Responses: {responses:#?}"); } { @@ -145,8 +137,6 @@ pub async fn run_actor(library: Arc, node: Arc) { } } - println!("send_actor sleeping"); - sleep(Duration::from_millis(1000)).await; } } diff --git a/core/src/library/actors.rs b/core/src/library/actors.rs new file mode 100644 index 000000000..7e659802b --- /dev/null +++ b/core/src/library/actors.rs @@ -0,0 +1,123 @@ +use futures::Future; +use std::{collections::HashMap, pin::Pin, sync::Arc}; +use tokio::{ + sync::{broadcast, oneshot, Mutex}, + task::AbortHandle, +}; + +pub struct Actor { + pub abort_handle: Mutex>, + pub spawn_fn: Arc Pin + Send>> + Send + Sync>, +} + +pub struct Actors { + pub invalidate_rx: broadcast::Receiver<()>, + invalidate_tx: broadcast::Sender<()>, + actors: Arc>>>, +} + +impl Actors { + pub async fn declare + Send + 'static>( + self: &Arc, + name: &str, + actor_fn: impl FnOnce() -> F + Send + Sync + Clone + 'static, + autostart: bool, + ) { + let mut actors = self.actors.lock().await; + + actors.insert( + name.to_string(), + Arc::new(Actor { + abort_handle: Default::default(), + spawn_fn: Arc::new(move || Box::pin((actor_fn.clone())()) as Pin>), + }), + ); + + if autostart { + self.start(name).await; + } + } + + pub async fn start(self: &Arc, name: &str) { + let name = name.to_string(); + let actors = self.actors.lock().await; + + let Some(actor) = actors.get(&name).cloned() else { + return; + }; + + let mut abort_handle = actor.abort_handle.lock().await; + if abort_handle.is_some() { + return; + } + + let (tx, rx) = oneshot::channel(); + + let invalidate_tx = self.invalidate_tx.clone(); + + let spawn_fn = actor.spawn_fn.clone(); + + let task = tokio::spawn(async move { + (spawn_fn)().await; + + tx.send(()).ok(); + }); + + *abort_handle = Some(task.abort_handle()); + invalidate_tx.send(()).ok(); + + tokio::spawn({ + let actor = actor.clone(); + async move { + #[allow(clippy::match_single_binding)] + match rx.await { + _ => {} + }; + + actor.abort_handle.lock().await.take(); + invalidate_tx.send(()).ok(); + } + }); + } + + pub async fn stop(self: &Arc, name: &str) { + let name = name.to_string(); + let actors = self.actors.lock().await; + + let Some(actor) = actors.get(&name).cloned() else { + return; + }; + + let mut abort_handle = actor.abort_handle.lock().await; + + if let Some(abort_handle) = abort_handle.take() { + abort_handle.abort(); + } + } + + pub async fn get_state(&self) -> HashMap { + let actors = self.actors.lock().await; + + let mut state = HashMap::new(); + + for (name, actor) in &*actors { + state.insert(name.to_string(), actor.abort_handle.lock().await.is_some()); + } + + state + } +} + +impl Default for Actors { + fn default() -> Self { + let actors = Default::default(); + + let (invalidate_tx, invalidate_rx) = broadcast::channel(1); + + Self { + actors, + invalidate_rx, + invalidate_tx, + } + } +} diff --git a/core/src/library/library.rs b/core/src/library/library.rs index ff2d83e38..00bea3fd5 100644 --- a/core/src/library/library.rs +++ b/core/src/library/library.rs @@ -27,7 +27,7 @@ use tokio::{fs, io, sync::broadcast, sync::RwLock}; use tracing::warn; use uuid::Uuid; -use super::{LibraryConfig, LibraryManagerError}; +use super::{Actors, LibraryConfig, LibraryManagerError}; // TODO: Finish this // pub enum LibraryNew { @@ -60,6 +60,8 @@ pub struct Library { // Look, I think this shouldn't be here but our current invalidation system needs it. // TODO(@Oscar): Get rid of this with the new invalidation system. event_bus_tx: broadcast::Sender, + + pub actors: Arc, } impl Debug for Library { @@ -97,6 +99,7 @@ impl Library { instance_uuid, env: node.env.clone(), event_bus_tx: node.event_bus.0.clone(), + actors: Default::default(), }) } diff --git a/core/src/library/manager/mod.rs b/core/src/library/manager/mod.rs index 6ee7bf2af..68640fc75 100644 --- a/core/src/library/manager/mod.rs +++ b/core/src/library/manager/mod.rs @@ -28,10 +28,7 @@ use std::{ collections::HashMap, path::{Path, PathBuf}, str::FromStr, - sync::{ - atomic::{self, AtomicBool}, - Arc, - }, + sync::{atomic::AtomicBool, Arc}, }; use chrono::Utc; @@ -481,9 +478,7 @@ impl Libraries { // This is an exception. Generally subscribe to this by `self.tx.subscribe`. tokio::spawn(sync_rx_actor(library.clone(), node.clone(), sync.rx)); - if node.cloud_sync_flag.load(atomic::Ordering::Relaxed) { - crate::cloud::sync::spawn_actors(&library, &node); - } + crate::cloud::sync::declare_actors(&library, node).await; self.tx .emit(LibraryManagerEvent::Load(library.clone())) diff --git a/core/src/library/mod.rs b/core/src/library/mod.rs index 6fefa8c17..01284ff22 100644 --- a/core/src/library/mod.rs +++ b/core/src/library/mod.rs @@ -1,4 +1,5 @@ // pub(crate) mod cat; +mod actors; mod config; #[allow(clippy::module_inception)] mod library; @@ -6,6 +7,7 @@ mod manager; mod name; // pub use cat::*; +pub use actors::*; pub use config::*; pub use library::*; pub use manager::*; diff --git a/crates/cloud-api/src/lib.rs b/crates/cloud-api/src/lib.rs index ec9fb78f5..e598e893a 100644 --- a/crates/cloud-api/src/lib.rs +++ b/crates/cloud-api/src/lib.rs @@ -66,7 +66,7 @@ pub mod library { return Err(Error("Authentication required".to_string())); }; - Ok(config + config .client .get(&format!( "{}/api/v1/libraries/{}", @@ -78,7 +78,7 @@ pub mod library { .map_err(|e| Error(e.to_string()))? .json() .await - .map_err(|e| Error(e.to_string()))?) + .map_err(|e| Error(e.to_string())) } pub type Response = Option; @@ -93,7 +93,7 @@ pub mod library { return Err(Error("Authentication required".to_string())); }; - Ok(config + config .client .get(&format!("{}/api/v1/libraries", config.api_url)) .with_auth(auth_token) @@ -102,7 +102,7 @@ pub mod library { .map_err(|e| Error(e.to_string()))? .json() .await - .map_err(|e| Error(e.to_string()))?) + .map_err(|e| Error(e.to_string())) } pub type Response = Vec; diff --git a/crates/crypto/src/keys/keyring/linux.rs b/crates/crypto/src/keys/keyring/linux.rs index 7786d1aea..6398e6475 100644 --- a/crates/crypto/src/keys/keyring/linux.rs +++ b/crates/crypto/src/keys/keyring/linux.rs @@ -50,7 +50,7 @@ impl<'a> Keyring for LinuxKeyring<'a> { let collection = self.get_collection()?; let items = collection.search_items(identifier.to_hashmap())?; - items.get(0).map_or(Err(Error::KeyringError), |k| { + items.first().map_or(Err(Error::KeyringError), |k| { Ok(Protected::new(k.get_secret()?)) }) } @@ -58,7 +58,7 @@ impl<'a> Keyring for LinuxKeyring<'a> { fn delete(&self, identifier: Identifier) -> Result<()> { self.get_collection()? .search_items(identifier.to_hashmap())? - .get(0) + .first() .map_or(Err(Error::KeyringError), |k| { k.delete()?; Ok(()) diff --git a/interface/app/$libraryId/Layout/Sidebar/Contents.tsx b/interface/app/$libraryId/Layout/Sidebar/Contents.tsx index 33caae942..25d70522b 100644 --- a/interface/app/$libraryId/Layout/Sidebar/Contents.tsx +++ b/interface/app/$libraryId/Layout/Sidebar/Contents.tsx @@ -1,16 +1,16 @@ -import { ArrowsClockwise, Cloud } from '@phosphor-icons/react'; +import { ArrowsClockwise, Cloud, Database, Factory } from '@phosphor-icons/react'; import { LibraryContextProvider, useClientContext, useFeatureFlag } from '@sd/client'; import { EphemeralSection } from './EphemeralSection'; import Icon from './Icon'; import { LibrarySection } from './LibrarySection'; import SidebarLink from './Link'; +import Section from './Section'; export default () => { const { library } = useClientContext(); - const showSyncRoute = useFeatureFlag('syncRoute'); - const showCloud = useFeatureFlag('cloud'); + const debugRoutes = useFeatureFlag('debugRoutes'); return (
@@ -23,21 +23,27 @@ export default () => { Imports */} - {(showSyncRoute || showCloud) && ( -
- {showSyncRoute && ( - + {debugRoutes && ( +
+
+ Sync - )} - {showCloud && ( - + Cloud - )} -
+ + + Cache + + + + Actors + +
+ )} {library && ( diff --git a/interface/app/$libraryId/debug/actors.tsx b/interface/app/$libraryId/debug/actors.tsx new file mode 100644 index 000000000..38fc0e171 --- /dev/null +++ b/interface/app/$libraryId/debug/actors.tsx @@ -0,0 +1,68 @@ +import { inferSubscriptionResult } from '@rspc/client'; +import { useMemo, useState } from 'react'; +import { Procedures, useLibraryMutation, useLibrarySubscription } from '@sd/client'; +import { Button } from '@sd/ui'; +import { useRouteTitle } from '~/hooks/useRouteTitle'; + +export const Component = () => { + useRouteTitle('Actors'); + + const [data, setData] = useState>({}); + + useLibrarySubscription(['library.actors'], { onData: setData }); + + const sortedData = useMemo(() => { + const sorted = Object.entries(data).sort(([a], [b]) => a.localeCompare(b)); + return sorted; + }, [data]); + + return ( +
+ + + + + + {sortedData.map(([name, running]) => ( + + + + + + ))} +
NameRunning
{name} + {running ? 'Running' : 'Not Running'} + + {running ? : } +
+
+ ); +}; + +function StartButton({ name }: { name: string }) { + const startActor = useLibraryMutation(['library.startActor']); + + return ( + + ); +} + +function StopButton({ name }: { name: string }) { + const stopActor = useLibraryMutation(['library.stopActor']); + + return ( + + ); +} diff --git a/interface/app/$libraryId/cloud.tsx b/interface/app/$libraryId/debug/cloud.tsx similarity index 100% rename from interface/app/$libraryId/cloud.tsx rename to interface/app/$libraryId/debug/cloud.tsx diff --git a/interface/app/$libraryId/debug/index.ts b/interface/app/$libraryId/debug/index.ts index 7c6058b2e..1eaf21984 100644 --- a/interface/app/$libraryId/debug/index.ts +++ b/interface/app/$libraryId/debug/index.ts @@ -1,5 +1,8 @@ import { RouteObject } from 'react-router'; -export const debugRoutes: RouteObject = { - children: [{ path: 'cache', lazy: () => import('./cache') }] -}; +export const debugRoutes = [ + { path: 'cache', lazy: () => import('./cache') }, + { path: 'cloud', lazy: () => import('./cloud') }, + { path: 'sync', lazy: () => import('./sync') }, + { path: 'actors', lazy: () => import('./actors') } +] satisfies RouteObject[]; diff --git a/interface/app/$libraryId/sync.tsx b/interface/app/$libraryId/debug/sync.tsx similarity index 100% rename from interface/app/$libraryId/sync.tsx rename to interface/app/$libraryId/debug/sync.tsx diff --git a/interface/app/$libraryId/index.tsx b/interface/app/$libraryId/index.tsx index ae9d8c9fe..1caebd1aa 100644 --- a/interface/app/$libraryId/index.tsx +++ b/interface/app/$libraryId/index.tsx @@ -1,6 +1,5 @@ import { redirect } from '@remix-run/router'; -import { Navigate, useRouteError, type RouteObject } from 'react-router-dom'; -import { useHomeDir } from '~/hooks/useHomeDir'; +import { type RouteObject } from 'react-router-dom'; import { Platform } from '~/util/Platform'; import { debugRoutes } from './debug'; @@ -13,9 +12,7 @@ const pageRoutes: RouteObject = { { path: 'people', lazy: () => import('./people') }, { path: 'media', lazy: () => import('./media') }, { path: 'spaces', lazy: () => import('./spaces') }, - { path: 'sync', lazy: () => import('./sync') }, - { path: 'cloud', lazy: () => import('./cloud') }, - { path: 'debug', children: [debugRoutes] } + { path: 'debug', children: debugRoutes } ] }; @@ -27,10 +24,7 @@ const explorerRoutes: RouteObject[] = [ { path: 'node/:id', lazy: () => import('./node/$id') }, { path: 'tag/:id', lazy: () => import('./tag/$id') }, { path: 'network', lazy: () => import('./network') }, - { - path: 'saved-search/:id', - lazy: () => import('./saved-search/$id') - } + { path: 'saved-search/:id', lazy: () => import('./saved-search/$id') } ]; // Routes that should render with the top bar - pretty much everything except diff --git a/interface/app/onboarding/new-library.tsx b/interface/app/onboarding/new-library.tsx index e61970dc7..425d1c0d4 100644 --- a/interface/app/onboarding/new-library.tsx +++ b/interface/app/onboarding/new-library.tsx @@ -19,7 +19,7 @@ export default function OnboardingNewLibrary() { // TODO }; - const cloudFeatureFlag = useFeatureFlag('cloud'); + const cloudFeatureFlag = useFeatureFlag('cloudSync'); return (
} | { key: "library.delete", input: string, result: null } | { key: "library.edit", input: EditLibraryArgs, result: null } | + { key: "library.startActor", input: LibraryArgs, result: null } | + { key: "library.stopActor", input: LibraryArgs, result: null } | { key: "locations.addLibrary", input: LibraryArgs, result: number | null } | { key: "locations.create", input: LibraryArgs, result: number | null } | { key: "locations.delete", input: LibraryArgs, result: null } | @@ -112,6 +114,7 @@ export type Procedures = { { key: "invalidation.listen", input: never, result: InvalidateOperationEvent[] } | { key: "jobs.newThumbnail", input: LibraryArgs, result: string[] } | { key: "jobs.progress", input: LibraryArgs, result: JobProgressEvent } | + { key: "library.actors", input: LibraryArgs, result: { [key in string]: boolean } } | { key: "locations.online", input: never, result: number[][] } | { key: "locations.quickRescan", input: LibraryArgs, result: null } | { key: "notifications.listen", input: never, result: Notification } | diff --git a/packages/client/src/hooks/useFeatureFlag.tsx b/packages/client/src/hooks/useFeatureFlag.tsx index 33f1cc0e9..39f7854d6 100644 --- a/packages/client/src/hooks/useFeatureFlag.tsx +++ b/packages/client/src/hooks/useFeatureFlag.tsx @@ -5,7 +5,7 @@ import type { BackendFeature } from '../core'; import { valtioPersist } from '../lib/valito'; import { nonLibraryClient, useBridgeQuery } from '../rspc'; -export const features = ['spacedrop', 'p2pPairing', 'syncRoute', 'backups', 'cloud'] as const; +export const features = ['spacedrop', 'p2pPairing', 'backups', 'debugRoutes'] as const; // This defines which backend feature flags show up in the UI. // This is kinda a hack to not having the runtime array of possible features as Specta only exports the types.