mirror of
https://github.com/spacedriveapp/spacedrive
synced 2024-07-05 08:03:28 +00:00
Basic actor manager (#1888)
* basic declared actor manager * put actors in separate file * clippy * hopefully clean up some clippy warnings --------- Co-authored-by: jake <77554505+brxken128@users.noreply.github.com>
This commit is contained in:
parent
b5aa4970e5
commit
3cabc9c3a9
|
@ -257,13 +257,10 @@ async fn write_crdt_op_to_db(
|
|||
) -> Result<(), prisma_client_rust::QueryError> {
|
||||
match &op.typ {
|
||||
CRDTOperationType::Shared(shared_op) => {
|
||||
shared_op_db(&op, shared_op).to_query(&db).exec().await?;
|
||||
shared_op_db(op, shared_op).to_query(db).exec().await?;
|
||||
}
|
||||
CRDTOperationType::Relation(relation_op) => {
|
||||
relation_op_db(&op, relation_op)
|
||||
.to_query(&db)
|
||||
.exec()
|
||||
.await?;
|
||||
relation_op_db(op, relation_op).to_query(db).exec().await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ pub struct New {
|
|||
}
|
||||
|
||||
impl Manager {
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub fn new(
|
||||
db: &Arc<PrismaClient>,
|
||||
instance: Uuid,
|
||||
|
|
|
@ -59,7 +59,12 @@ mod library {
|
|||
.libraries
|
||||
.create_with_uuid(
|
||||
library_id,
|
||||
LibraryName::new(cloud_library.name).unwrap(),
|
||||
LibraryName::new(cloud_library.name).map_err(|e| {
|
||||
rspc::Error::new(
|
||||
rspc::ErrorCode::InternalServerError,
|
||||
e.to_string(),
|
||||
)
|
||||
})?,
|
||||
None,
|
||||
false,
|
||||
None,
|
||||
|
|
|
@ -323,4 +323,38 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
|||
node.libraries.delete(&id).await.map_err(Into::into)
|
||||
}),
|
||||
)
|
||||
.procedure(
|
||||
"actors",
|
||||
R.with2(library()).subscription(|(_, library), _: ()| {
|
||||
let mut rx = library.actors.invalidate_rx.resubscribe();
|
||||
|
||||
async_stream::stream! {
|
||||
let actors = library.actors.get_state().await;
|
||||
yield actors;
|
||||
|
||||
while let Ok(()) = rx.recv().await {
|
||||
let actors = library.actors.get_state().await;
|
||||
yield actors;
|
||||
}
|
||||
}
|
||||
}),
|
||||
)
|
||||
.procedure(
|
||||
"startActor",
|
||||
R.with2(library())
|
||||
.mutation(|(_, library), name: String| async move {
|
||||
library.actors.start(&name).await;
|
||||
|
||||
Ok(())
|
||||
}),
|
||||
)
|
||||
.procedure(
|
||||
"stopActor",
|
||||
R.with2(library())
|
||||
.mutation(|(_, library), name: String| async move {
|
||||
library.actors.stop(&name).await;
|
||||
|
||||
Ok(())
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -26,17 +26,12 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
|||
}
|
||||
|
||||
// TODO: Don't block subscription start
|
||||
for identity in node
|
||||
.p2p
|
||||
.manager
|
||||
.get_connected_peers()
|
||||
.await
|
||||
.map_err(|_err| {
|
||||
rspc::Error::new(
|
||||
ErrorCode::InternalServerError,
|
||||
"todo: error getting connected peers".into(),
|
||||
)
|
||||
})? {
|
||||
for identity in node.p2p.manager.get_connected_peers().await.map_err(|_| {
|
||||
rspc::Error::new(
|
||||
ErrorCode::InternalServerError,
|
||||
"todo: error getting connected peers".into(),
|
||||
)
|
||||
})? {
|
||||
queued.push(P2PEvent::ConnectedPeer { identity });
|
||||
}
|
||||
|
||||
|
@ -88,7 +83,11 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
|||
})
|
||||
})
|
||||
.procedure("cancelSpacedrop", {
|
||||
R.mutation(|node, id: Uuid| async move { Ok(node.p2p.cancel_spacedrop(id).await) })
|
||||
R.mutation(|node, id: Uuid| async move {
|
||||
node.p2p.cancel_spacedrop(id).await;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.procedure("pair", {
|
||||
R.mutation(|node, id: RemoteIdentity| async move {
|
||||
|
@ -98,6 +97,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
|||
.procedure("pairingResponse", {
|
||||
R.mutation(|node, (pairing_id, decision): (u16, PairingDecision)| {
|
||||
node.p2p.pairing.decision(pairing_id, decision);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
|
|
|
@ -15,8 +15,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
|||
blake3::hash(
|
||||
&i.mount_points
|
||||
.iter()
|
||||
.map(|mp| mp.as_os_str().to_string_lossy().as_bytes().to_vec())
|
||||
.flatten()
|
||||
.flat_map(|mp| mp.as_os_str().to_string_lossy().as_bytes().to_vec())
|
||||
.collect::<Vec<u8>>(),
|
||||
)
|
||||
.to_hex()
|
||||
|
|
|
@ -1,47 +1,54 @@
|
|||
use crate::cloud::sync::err_return;
|
||||
|
||||
use super::Library;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
pub async fn run_actor(library: Arc<Library>, notify: Arc<Notify>) {
|
||||
pub async fn run_actor((library, notify): (Arc<Library>, Arc<Notify>)) {
|
||||
let Library { sync, .. } = library.as_ref();
|
||||
|
||||
loop {
|
||||
let mut rx = sync.ingest.req_rx.lock().await;
|
||||
{
|
||||
let mut rx = sync.ingest.req_rx.lock().await;
|
||||
|
||||
sync.ingest
|
||||
.event_tx
|
||||
.send(sd_core_sync::Event::Notification)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
use crate::sync::ingest::*;
|
||||
|
||||
while let Some(req) = rx.recv().await {
|
||||
const OPS_PER_REQUEST: u32 = 1000;
|
||||
|
||||
let timestamps = match req {
|
||||
Request::FinishedIngesting => break,
|
||||
Request::Messages { timestamps } => timestamps,
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
let ops = sync
|
||||
.get_cloud_ops(crate::sync::GetOpsArgs {
|
||||
clocks: timestamps,
|
||||
count: OPS_PER_REQUEST,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
sync.ingest
|
||||
if sync
|
||||
.ingest
|
||||
.event_tx
|
||||
.send(sd_core_sync::Event::Messages(MessagesEvent {
|
||||
instance_id: library.sync.instance,
|
||||
has_more: ops.len() == 1000,
|
||||
messages: ops,
|
||||
}))
|
||||
.send(sd_core_sync::Event::Notification)
|
||||
.await
|
||||
.unwrap();
|
||||
.is_ok()
|
||||
{
|
||||
use crate::sync::ingest::*;
|
||||
|
||||
while let Some(req) = rx.recv().await {
|
||||
const OPS_PER_REQUEST: u32 = 1000;
|
||||
|
||||
let timestamps = match req {
|
||||
Request::FinishedIngesting => break,
|
||||
Request::Messages { timestamps } => timestamps,
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
let ops = err_return!(
|
||||
sync.get_cloud_ops(crate::sync::GetOpsArgs {
|
||||
clocks: timestamps,
|
||||
count: OPS_PER_REQUEST,
|
||||
})
|
||||
.await
|
||||
);
|
||||
|
||||
err_return!(
|
||||
sync.ingest
|
||||
.event_tx
|
||||
.send(sd_core_sync::Event::Messages(MessagesEvent {
|
||||
instance_id: library.sync.instance,
|
||||
has_more: ops.len() == 1000,
|
||||
messages: ops,
|
||||
}))
|
||||
.await
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
notify.notified().await;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::sync::Arc;
|
||||
use std::sync::{atomic, Arc};
|
||||
|
||||
use crate::{library::Library, Node};
|
||||
|
||||
|
@ -6,16 +6,34 @@ mod ingest;
|
|||
mod receive;
|
||||
mod send;
|
||||
|
||||
pub fn spawn_actors(library: &Arc<Library>, node: &Arc<Node>) {
|
||||
pub async fn declare_actors(library: &Arc<Library>, node: &Arc<Node>) {
|
||||
let ingest_notify = Arc::new(Notify::new());
|
||||
let actors = &library.actors;
|
||||
|
||||
tokio::spawn(send::run_actor(library.clone(), node.clone()));
|
||||
tokio::spawn(receive::run_actor(
|
||||
library.clone(),
|
||||
node.clone(),
|
||||
ingest_notify.clone(),
|
||||
));
|
||||
tokio::spawn(ingest::run_actor(library.clone(), ingest_notify));
|
||||
let autorun = node.cloud_sync_flag.load(atomic::Ordering::Relaxed);
|
||||
|
||||
let args = (library.clone(), node.clone());
|
||||
actors
|
||||
.declare("Cloud Sync Sender", move || send::run_actor(args), autorun)
|
||||
.await;
|
||||
|
||||
let args = (library.clone(), node.clone(), ingest_notify.clone());
|
||||
actors
|
||||
.declare(
|
||||
"Cloud Sync Receiver",
|
||||
move || receive::run_actor(args),
|
||||
autorun,
|
||||
)
|
||||
.await;
|
||||
|
||||
let args = (library.clone(), ingest_notify);
|
||||
actors
|
||||
.declare(
|
||||
"Cloud Sync Ingest",
|
||||
move || ingest::run_actor(args),
|
||||
autorun,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
macro_rules! err_break {
|
||||
|
|
|
@ -14,14 +14,15 @@ use sd_sync::*;
|
|||
use sd_utils::{from_bytes_to_uuid, uuid_to_bytes};
|
||||
use serde::Deserialize;
|
||||
use serde_json::{json, to_vec};
|
||||
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::{sync::Notify, time::sleep};
|
||||
use tracing::debug;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub async fn run_actor(library: Arc<Library>, node: Arc<Node>, ingest_notify: Arc<Notify>) {
|
||||
debug!("receive actor running");
|
||||
|
||||
pub async fn run_actor((library, node, ingest_notify): (Arc<Library>, Arc<Node>, Arc<Notify>)) {
|
||||
let db = &library.db;
|
||||
let api_url = &library.env.api_url;
|
||||
let library_id = library.id;
|
||||
|
@ -45,7 +46,9 @@ pub async fn run_actor(library: Arc<Library>, node: Arc<Node>, ingest_notify: Ar
|
|||
.zip(timestamps.keys())
|
||||
.map(|(d, id)| {
|
||||
let cloud_timestamp = NTP64(d.map(|d| d.timestamp).unwrap_or_default() as u64);
|
||||
let sync_timestamp = *timestamps.get(id).unwrap();
|
||||
let sync_timestamp = *timestamps
|
||||
.get(id)
|
||||
.expect("unable to find matching timestamp");
|
||||
|
||||
let max_timestamp = Ord::max(cloud_timestamp, sync_timestamp);
|
||||
|
||||
|
@ -85,28 +88,27 @@ pub async fn run_actor(library: Arc<Library>, node: Arc<Node>, ingest_notify: Ar
|
|||
}
|
||||
|
||||
{
|
||||
let collections = err_break!(
|
||||
err_break!(
|
||||
node.authed_api_request(
|
||||
node.http
|
||||
.post(&format!(
|
||||
"{api_url}/api/v1/libraries/{library_id}/messageCollections/get"
|
||||
))
|
||||
.json(&json!({
|
||||
"instanceUuid": library.instance_uuid,
|
||||
"timestamps": instances
|
||||
})),
|
||||
)
|
||||
.await
|
||||
let collections = node
|
||||
.authed_api_request(
|
||||
node.http
|
||||
.post(&format!(
|
||||
"{api_url}/api/v1/libraries/{library_id}/messageCollections/get"
|
||||
))
|
||||
.json(&json!({
|
||||
"instanceUuid": library.instance_uuid,
|
||||
"timestamps": instances
|
||||
})),
|
||||
)
|
||||
.await
|
||||
.expect("couldn't get response")
|
||||
.json::<Vec<MessageCollection>>()
|
||||
.await
|
||||
);
|
||||
.expect("couldn't deserialize response");
|
||||
|
||||
let mut cloud_library_data: Option<Option<sd_cloud_api::Library>> = None;
|
||||
|
||||
for collection in collections {
|
||||
if !cloud_timestamps.contains_key(&collection.instance_uuid) {
|
||||
if let Entry::Vacant(e) = cloud_timestamps.entry(collection.instance_uuid) {
|
||||
let fetched_library = match &cloud_library_data {
|
||||
None => {
|
||||
let Some(fetched_library) = err_break!(
|
||||
|
@ -122,7 +124,7 @@ pub async fn run_actor(library: Arc<Library>, node: Arc<Node>, ingest_notify: Ar
|
|||
cloud_library_data
|
||||
.insert(Some(fetched_library))
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.expect("error inserting fetched library")
|
||||
}
|
||||
Some(None) => {
|
||||
break;
|
||||
|
@ -140,14 +142,14 @@ pub async fn run_actor(library: Arc<Library>, node: Arc<Node>, ingest_notify: Ar
|
|||
|
||||
err_break!(
|
||||
create_instance(
|
||||
&db,
|
||||
db,
|
||||
collection.instance_uuid,
|
||||
err_break!(BASE64_STANDARD.decode(instance.identity.clone()))
|
||||
)
|
||||
.await
|
||||
);
|
||||
|
||||
cloud_timestamps.insert(collection.instance_uuid, NTP64(0));
|
||||
e.insert(NTP64(0));
|
||||
}
|
||||
|
||||
err_break!(
|
||||
|
@ -160,7 +162,8 @@ pub async fn run_actor(library: Arc<Library>, node: Arc<Node>, ingest_notify: Ar
|
|||
.await
|
||||
);
|
||||
|
||||
let collection_timestamp = NTP64(collection.end_time.parse().unwrap());
|
||||
let collection_timestamp =
|
||||
NTP64(collection.end_time.parse().expect("unable to parse time"));
|
||||
|
||||
let timestamp = cloud_timestamps
|
||||
.entry(collection.instance_uuid)
|
||||
|
@ -184,10 +187,10 @@ async fn write_cloud_ops_to_db(
|
|||
) -> Result<(), prisma_client_rust::QueryError> {
|
||||
let (shared, relation): (Vec<_>, Vec<_>) = ops.into_iter().partition_map(|op| match &op.typ {
|
||||
CRDTOperationType::Shared(shared_op) => {
|
||||
Either::Left(shared_op_db(&op, &shared_op).to_query(&db))
|
||||
Either::Left(shared_op_db(&op, shared_op).to_query(db))
|
||||
}
|
||||
CRDTOperationType::Relation(relation_op) => {
|
||||
Either::Right(relation_op_db(&op, &relation_op).to_query(&db))
|
||||
Either::Right(relation_op_db(&op, relation_op).to_query(db))
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -202,9 +205,9 @@ fn shared_op_db(op: &CRDTOperation, shared_op: &SharedOperation) -> cloud_shared
|
|||
timestamp: op.timestamp.0 as i64,
|
||||
instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()),
|
||||
kind: shared_op.kind().to_string(),
|
||||
data: to_vec(&shared_op.data).unwrap(),
|
||||
data: to_vec(&shared_op.data).expect("unable to serialize data"),
|
||||
model: shared_op.model.to_string(),
|
||||
record_id: to_vec(&shared_op.record_id).unwrap(),
|
||||
record_id: to_vec(&shared_op.record_id).expect("unable to serialize record id"),
|
||||
_params: vec![],
|
||||
}
|
||||
}
|
||||
|
@ -218,10 +221,10 @@ fn relation_op_db(
|
|||
timestamp: op.timestamp.0 as i64,
|
||||
instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()),
|
||||
kind: relation_op.kind().to_string(),
|
||||
data: to_vec(&relation_op.data).unwrap(),
|
||||
data: to_vec(&relation_op.data).expect("unable to serialize data"),
|
||||
relation: relation_op.relation.to_string(),
|
||||
item_id: to_vec(&relation_op.relation_item).unwrap(),
|
||||
group_id: to_vec(&relation_op.relation_group).unwrap(),
|
||||
item_id: to_vec(&relation_op.relation_item).expect("unable to serialize item id"),
|
||||
group_id: to_vec(&relation_op.relation_group).expect("unable to serialize group id"),
|
||||
_params: vec![],
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,16 +9,12 @@ use std::{sync::Arc, time::Duration};
|
|||
use tokio::time::sleep;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub async fn run_actor(library: Arc<Library>, node: Arc<Node>) {
|
||||
pub async fn run_actor((library, node): (Arc<Library>, Arc<Node>)) {
|
||||
let db = &library.db;
|
||||
let api_url = &library.env.api_url;
|
||||
let library_id = library.id;
|
||||
|
||||
loop {
|
||||
println!("send_actor run");
|
||||
|
||||
println!("send_actor sending");
|
||||
|
||||
loop {
|
||||
let instances = err_break!(
|
||||
db.instance()
|
||||
|
@ -55,8 +51,6 @@ pub async fn run_actor(library: Arc<Library>, node: Arc<Node>) {
|
|||
.await
|
||||
);
|
||||
|
||||
println!("Add Requests: {req_adds:#?}");
|
||||
|
||||
let mut instances = vec![];
|
||||
|
||||
for req_add in req_adds {
|
||||
|
@ -72,14 +66,14 @@ pub async fn run_actor(library: Arc<Library>, node: Arc<Node>) {
|
|||
.from_time
|
||||
.unwrap_or_else(|| "0".to_string())
|
||||
.parse()
|
||||
.unwrap(),
|
||||
.expect("couldn't parse ntp64 value"),
|
||||
),
|
||||
)],
|
||||
})
|
||||
.await
|
||||
);
|
||||
|
||||
if ops.len() == 0 {
|
||||
if ops.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -100,11 +94,11 @@ pub async fn run_actor(library: Arc<Library>, node: Arc<Node>) {
|
|||
"Number of messages: {}",
|
||||
instances
|
||||
.iter()
|
||||
.map(|i| i["contents"].as_array().unwrap().len())
|
||||
.map(|i| i["contents"].as_array().expect("no contents found").len())
|
||||
.sum::<usize>()
|
||||
);
|
||||
|
||||
if instances.len() == 0 {
|
||||
if instances.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -115,7 +109,7 @@ pub async fn run_actor(library: Arc<Library>, node: Arc<Node>) {
|
|||
// from_time: String,
|
||||
}
|
||||
|
||||
let responses = err_break!(
|
||||
let _responses = err_break!(
|
||||
err_break!(
|
||||
node.authed_api_request(
|
||||
node.http
|
||||
|
@ -129,8 +123,6 @@ pub async fn run_actor(library: Arc<Library>, node: Arc<Node>) {
|
|||
.json::<Vec<DoAdd>>()
|
||||
.await
|
||||
);
|
||||
|
||||
println!("DoAdd Responses: {responses:#?}");
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -145,8 +137,6 @@ pub async fn run_actor(library: Arc<Library>, node: Arc<Node>) {
|
|||
}
|
||||
}
|
||||
|
||||
println!("send_actor sleeping");
|
||||
|
||||
sleep(Duration::from_millis(1000)).await;
|
||||
}
|
||||
}
|
||||
|
|
123
core/src/library/actors.rs
Normal file
123
core/src/library/actors.rs
Normal file
|
@ -0,0 +1,123 @@
|
|||
use futures::Future;
|
||||
use std::{collections::HashMap, pin::Pin, sync::Arc};
|
||||
use tokio::{
|
||||
sync::{broadcast, oneshot, Mutex},
|
||||
task::AbortHandle,
|
||||
};
|
||||
|
||||
pub struct Actor {
|
||||
pub abort_handle: Mutex<Option<AbortHandle>>,
|
||||
pub spawn_fn: Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>,
|
||||
}
|
||||
|
||||
pub struct Actors {
|
||||
pub invalidate_rx: broadcast::Receiver<()>,
|
||||
invalidate_tx: broadcast::Sender<()>,
|
||||
actors: Arc<Mutex<HashMap<String, Arc<Actor>>>>,
|
||||
}
|
||||
|
||||
impl Actors {
|
||||
pub async fn declare<F: Future<Output = ()> + Send + 'static>(
|
||||
self: &Arc<Self>,
|
||||
name: &str,
|
||||
actor_fn: impl FnOnce() -> F + Send + Sync + Clone + 'static,
|
||||
autostart: bool,
|
||||
) {
|
||||
let mut actors = self.actors.lock().await;
|
||||
|
||||
actors.insert(
|
||||
name.to_string(),
|
||||
Arc::new(Actor {
|
||||
abort_handle: Default::default(),
|
||||
spawn_fn: Arc::new(move || Box::pin((actor_fn.clone())()) as Pin<Box<_>>),
|
||||
}),
|
||||
);
|
||||
|
||||
if autostart {
|
||||
self.start(name).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(self: &Arc<Self>, name: &str) {
|
||||
let name = name.to_string();
|
||||
let actors = self.actors.lock().await;
|
||||
|
||||
let Some(actor) = actors.get(&name).cloned() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let mut abort_handle = actor.abort_handle.lock().await;
|
||||
if abort_handle.is_some() {
|
||||
return;
|
||||
}
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let invalidate_tx = self.invalidate_tx.clone();
|
||||
|
||||
let spawn_fn = actor.spawn_fn.clone();
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
(spawn_fn)().await;
|
||||
|
||||
tx.send(()).ok();
|
||||
});
|
||||
|
||||
*abort_handle = Some(task.abort_handle());
|
||||
invalidate_tx.send(()).ok();
|
||||
|
||||
tokio::spawn({
|
||||
let actor = actor.clone();
|
||||
async move {
|
||||
#[allow(clippy::match_single_binding)]
|
||||
match rx.await {
|
||||
_ => {}
|
||||
};
|
||||
|
||||
actor.abort_handle.lock().await.take();
|
||||
invalidate_tx.send(()).ok();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub async fn stop(self: &Arc<Self>, name: &str) {
|
||||
let name = name.to_string();
|
||||
let actors = self.actors.lock().await;
|
||||
|
||||
let Some(actor) = actors.get(&name).cloned() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let mut abort_handle = actor.abort_handle.lock().await;
|
||||
|
||||
if let Some(abort_handle) = abort_handle.take() {
|
||||
abort_handle.abort();
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_state(&self) -> HashMap<String, bool> {
|
||||
let actors = self.actors.lock().await;
|
||||
|
||||
let mut state = HashMap::new();
|
||||
|
||||
for (name, actor) in &*actors {
|
||||
state.insert(name.to_string(), actor.abort_handle.lock().await.is_some());
|
||||
}
|
||||
|
||||
state
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Actors {
|
||||
fn default() -> Self {
|
||||
let actors = Default::default();
|
||||
|
||||
let (invalidate_tx, invalidate_rx) = broadcast::channel(1);
|
||||
|
||||
Self {
|
||||
actors,
|
||||
invalidate_rx,
|
||||
invalidate_tx,
|
||||
}
|
||||
}
|
||||
}
|
|
@ -27,7 +27,7 @@ use tokio::{fs, io, sync::broadcast, sync::RwLock};
|
|||
use tracing::warn;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{LibraryConfig, LibraryManagerError};
|
||||
use super::{Actors, LibraryConfig, LibraryManagerError};
|
||||
|
||||
// TODO: Finish this
|
||||
// pub enum LibraryNew {
|
||||
|
@ -60,6 +60,8 @@ pub struct Library {
|
|||
// Look, I think this shouldn't be here but our current invalidation system needs it.
|
||||
// TODO(@Oscar): Get rid of this with the new invalidation system.
|
||||
event_bus_tx: broadcast::Sender<CoreEvent>,
|
||||
|
||||
pub actors: Arc<Actors>,
|
||||
}
|
||||
|
||||
impl Debug for Library {
|
||||
|
@ -97,6 +99,7 @@ impl Library {
|
|||
instance_uuid,
|
||||
env: node.env.clone(),
|
||||
event_bus_tx: node.event_bus.0.clone(),
|
||||
actors: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -28,10 +28,7 @@ use std::{
|
|||
collections::HashMap,
|
||||
path::{Path, PathBuf},
|
||||
str::FromStr,
|
||||
sync::{
|
||||
atomic::{self, AtomicBool},
|
||||
Arc,
|
||||
},
|
||||
sync::{atomic::AtomicBool, Arc},
|
||||
};
|
||||
|
||||
use chrono::Utc;
|
||||
|
@ -481,9 +478,7 @@ impl Libraries {
|
|||
// This is an exception. Generally subscribe to this by `self.tx.subscribe`.
|
||||
tokio::spawn(sync_rx_actor(library.clone(), node.clone(), sync.rx));
|
||||
|
||||
if node.cloud_sync_flag.load(atomic::Ordering::Relaxed) {
|
||||
crate::cloud::sync::spawn_actors(&library, &node);
|
||||
}
|
||||
crate::cloud::sync::declare_actors(&library, node).await;
|
||||
|
||||
self.tx
|
||||
.emit(LibraryManagerEvent::Load(library.clone()))
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
// pub(crate) mod cat;
|
||||
mod actors;
|
||||
mod config;
|
||||
#[allow(clippy::module_inception)]
|
||||
mod library;
|
||||
|
@ -6,6 +7,7 @@ mod manager;
|
|||
mod name;
|
||||
|
||||
// pub use cat::*;
|
||||
pub use actors::*;
|
||||
pub use config::*;
|
||||
pub use library::*;
|
||||
pub use manager::*;
|
||||
|
|
|
@ -66,7 +66,7 @@ pub mod library {
|
|||
return Err(Error("Authentication required".to_string()));
|
||||
};
|
||||
|
||||
Ok(config
|
||||
config
|
||||
.client
|
||||
.get(&format!(
|
||||
"{}/api/v1/libraries/{}",
|
||||
|
@ -78,7 +78,7 @@ pub mod library {
|
|||
.map_err(|e| Error(e.to_string()))?
|
||||
.json()
|
||||
.await
|
||||
.map_err(|e| Error(e.to_string()))?)
|
||||
.map_err(|e| Error(e.to_string()))
|
||||
}
|
||||
|
||||
pub type Response = Option<Library>;
|
||||
|
@ -93,7 +93,7 @@ pub mod library {
|
|||
return Err(Error("Authentication required".to_string()));
|
||||
};
|
||||
|
||||
Ok(config
|
||||
config
|
||||
.client
|
||||
.get(&format!("{}/api/v1/libraries", config.api_url))
|
||||
.with_auth(auth_token)
|
||||
|
@ -102,7 +102,7 @@ pub mod library {
|
|||
.map_err(|e| Error(e.to_string()))?
|
||||
.json()
|
||||
.await
|
||||
.map_err(|e| Error(e.to_string()))?)
|
||||
.map_err(|e| Error(e.to_string()))
|
||||
}
|
||||
|
||||
pub type Response = Vec<Library>;
|
||||
|
|
|
@ -50,7 +50,7 @@ impl<'a> Keyring for LinuxKeyring<'a> {
|
|||
let collection = self.get_collection()?;
|
||||
let items = collection.search_items(identifier.to_hashmap())?;
|
||||
|
||||
items.get(0).map_or(Err(Error::KeyringError), |k| {
|
||||
items.first().map_or(Err(Error::KeyringError), |k| {
|
||||
Ok(Protected::new(k.get_secret()?))
|
||||
})
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ impl<'a> Keyring for LinuxKeyring<'a> {
|
|||
fn delete(&self, identifier: Identifier) -> Result<()> {
|
||||
self.get_collection()?
|
||||
.search_items(identifier.to_hashmap())?
|
||||
.get(0)
|
||||
.first()
|
||||
.map_or(Err(Error::KeyringError), |k| {
|
||||
k.delete()?;
|
||||
Ok(())
|
||||
|
|
|
@ -1,16 +1,16 @@
|
|||
import { ArrowsClockwise, Cloud } from '@phosphor-icons/react';
|
||||
import { ArrowsClockwise, Cloud, Database, Factory } from '@phosphor-icons/react';
|
||||
import { LibraryContextProvider, useClientContext, useFeatureFlag } from '@sd/client';
|
||||
|
||||
import { EphemeralSection } from './EphemeralSection';
|
||||
import Icon from './Icon';
|
||||
import { LibrarySection } from './LibrarySection';
|
||||
import SidebarLink from './Link';
|
||||
import Section from './Section';
|
||||
|
||||
export default () => {
|
||||
const { library } = useClientContext();
|
||||
|
||||
const showSyncRoute = useFeatureFlag('syncRoute');
|
||||
const showCloud = useFeatureFlag('cloud');
|
||||
const debugRoutes = useFeatureFlag('debugRoutes');
|
||||
|
||||
return (
|
||||
<div className="no-scrollbar mask-fade-out flex grow flex-col space-y-5 overflow-x-hidden overflow-y-scroll pb-10">
|
||||
|
@ -23,21 +23,27 @@ export default () => {
|
|||
<Icon component={ArchiveBox} />
|
||||
Imports
|
||||
</SidebarLink> */}
|
||||
{(showSyncRoute || showCloud) && (
|
||||
<div className="space-y-0.5">
|
||||
{showSyncRoute && (
|
||||
<SidebarLink to="sync">
|
||||
{debugRoutes && (
|
||||
<Section name="Debug">
|
||||
<div className="space-y-0.5">
|
||||
<SidebarLink to="debug/sync">
|
||||
<Icon component={ArrowsClockwise} />
|
||||
Sync
|
||||
</SidebarLink>
|
||||
)}
|
||||
{showCloud && (
|
||||
<SidebarLink to="cloud">
|
||||
<SidebarLink to="debug/cloud">
|
||||
<Icon component={Cloud} />
|
||||
Cloud
|
||||
</SidebarLink>
|
||||
)}
|
||||
</div>
|
||||
<SidebarLink to="debug/cache">
|
||||
<Icon component={Database} />
|
||||
Cache
|
||||
</SidebarLink>
|
||||
<SidebarLink to="debug/actors">
|
||||
<Icon component={Factory} />
|
||||
Actors
|
||||
</SidebarLink>
|
||||
</div>
|
||||
</Section>
|
||||
)}
|
||||
<EphemeralSection />
|
||||
{library && (
|
||||
|
|
68
interface/app/$libraryId/debug/actors.tsx
Normal file
68
interface/app/$libraryId/debug/actors.tsx
Normal file
|
@ -0,0 +1,68 @@
|
|||
import { inferSubscriptionResult } from '@rspc/client';
|
||||
import { useMemo, useState } from 'react';
|
||||
import { Procedures, useLibraryMutation, useLibrarySubscription } from '@sd/client';
|
||||
import { Button } from '@sd/ui';
|
||||
import { useRouteTitle } from '~/hooks/useRouteTitle';
|
||||
|
||||
export const Component = () => {
|
||||
useRouteTitle('Actors');
|
||||
|
||||
const [data, setData] = useState<inferSubscriptionResult<Procedures, 'library.actors'>>({});
|
||||
|
||||
useLibrarySubscription(['library.actors'], { onData: setData });
|
||||
|
||||
const sortedData = useMemo(() => {
|
||||
const sorted = Object.entries(data).sort(([a], [b]) => a.localeCompare(b));
|
||||
return sorted;
|
||||
}, [data]);
|
||||
|
||||
return (
|
||||
<div className="h-full w-full">
|
||||
<table>
|
||||
<tr>
|
||||
<th>Name</th>
|
||||
<th>Running</th>
|
||||
</tr>
|
||||
{sortedData.map(([name, running]) => (
|
||||
<tr key={name}>
|
||||
<td className="pl-2 pr-4 text-left">{name}</td>
|
||||
<td className="pl-2 pr-4 text-left">
|
||||
{running ? 'Running' : 'Not Running'}
|
||||
</td>
|
||||
<td className="py-1">
|
||||
{running ? <StopButton name={name} /> : <StartButton name={name} />}
|
||||
</td>
|
||||
</tr>
|
||||
))}
|
||||
</table>
|
||||
</div>
|
||||
);
|
||||
};
|
||||
|
||||
function StartButton({ name }: { name: string }) {
|
||||
const startActor = useLibraryMutation(['library.startActor']);
|
||||
|
||||
return (
|
||||
<Button
|
||||
variant="accent"
|
||||
disabled={startActor.isLoading}
|
||||
onClick={() => startActor.mutate(name)}
|
||||
>
|
||||
{startActor.isLoading ? 'Starting...' : 'Start'}
|
||||
</Button>
|
||||
);
|
||||
}
|
||||
|
||||
function StopButton({ name }: { name: string }) {
|
||||
const stopActor = useLibraryMutation(['library.stopActor']);
|
||||
|
||||
return (
|
||||
<Button
|
||||
variant="accent"
|
||||
disabled={stopActor.isLoading}
|
||||
onClick={() => stopActor.mutate(name)}
|
||||
>
|
||||
{stopActor.isLoading ? 'Stopping...' : 'Stop'}
|
||||
</Button>
|
||||
);
|
||||
}
|
|
@ -1,5 +1,8 @@
|
|||
import { RouteObject } from 'react-router';
|
||||
|
||||
export const debugRoutes: RouteObject = {
|
||||
children: [{ path: 'cache', lazy: () => import('./cache') }]
|
||||
};
|
||||
export const debugRoutes = [
|
||||
{ path: 'cache', lazy: () => import('./cache') },
|
||||
{ path: 'cloud', lazy: () => import('./cloud') },
|
||||
{ path: 'sync', lazy: () => import('./sync') },
|
||||
{ path: 'actors', lazy: () => import('./actors') }
|
||||
] satisfies RouteObject[];
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import { redirect } from '@remix-run/router';
|
||||
import { Navigate, useRouteError, type RouteObject } from 'react-router-dom';
|
||||
import { useHomeDir } from '~/hooks/useHomeDir';
|
||||
import { type RouteObject } from 'react-router-dom';
|
||||
import { Platform } from '~/util/Platform';
|
||||
|
||||
import { debugRoutes } from './debug';
|
||||
|
@ -13,9 +12,7 @@ const pageRoutes: RouteObject = {
|
|||
{ path: 'people', lazy: () => import('./people') },
|
||||
{ path: 'media', lazy: () => import('./media') },
|
||||
{ path: 'spaces', lazy: () => import('./spaces') },
|
||||
{ path: 'sync', lazy: () => import('./sync') },
|
||||
{ path: 'cloud', lazy: () => import('./cloud') },
|
||||
{ path: 'debug', children: [debugRoutes] }
|
||||
{ path: 'debug', children: debugRoutes }
|
||||
]
|
||||
};
|
||||
|
||||
|
@ -27,10 +24,7 @@ const explorerRoutes: RouteObject[] = [
|
|||
{ path: 'node/:id', lazy: () => import('./node/$id') },
|
||||
{ path: 'tag/:id', lazy: () => import('./tag/$id') },
|
||||
{ path: 'network', lazy: () => import('./network') },
|
||||
{
|
||||
path: 'saved-search/:id',
|
||||
lazy: () => import('./saved-search/$id')
|
||||
}
|
||||
{ path: 'saved-search/:id', lazy: () => import('./saved-search/$id') }
|
||||
];
|
||||
|
||||
// Routes that should render with the top bar - pretty much everything except
|
||||
|
|
|
@ -19,7 +19,7 @@ export default function OnboardingNewLibrary() {
|
|||
// TODO
|
||||
};
|
||||
|
||||
const cloudFeatureFlag = useFeatureFlag('cloud');
|
||||
const cloudFeatureFlag = useFeatureFlag('cloudSync');
|
||||
|
||||
return (
|
||||
<Form
|
||||
|
|
|
@ -80,6 +80,8 @@ export type Procedures = {
|
|||
{ key: "library.create", input: CreateLibraryArgs, result: NormalisedResult<LibraryConfigWrapped> } |
|
||||
{ key: "library.delete", input: string, result: null } |
|
||||
{ key: "library.edit", input: EditLibraryArgs, result: null } |
|
||||
{ key: "library.startActor", input: LibraryArgs<string>, result: null } |
|
||||
{ key: "library.stopActor", input: LibraryArgs<string>, result: null } |
|
||||
{ key: "locations.addLibrary", input: LibraryArgs<LocationCreateArgs>, result: number | null } |
|
||||
{ key: "locations.create", input: LibraryArgs<LocationCreateArgs>, result: number | null } |
|
||||
{ key: "locations.delete", input: LibraryArgs<number>, result: null } |
|
||||
|
@ -112,6 +114,7 @@ export type Procedures = {
|
|||
{ key: "invalidation.listen", input: never, result: InvalidateOperationEvent[] } |
|
||||
{ key: "jobs.newThumbnail", input: LibraryArgs<null>, result: string[] } |
|
||||
{ key: "jobs.progress", input: LibraryArgs<null>, result: JobProgressEvent } |
|
||||
{ key: "library.actors", input: LibraryArgs<null>, result: { [key in string]: boolean } } |
|
||||
{ key: "locations.online", input: never, result: number[][] } |
|
||||
{ key: "locations.quickRescan", input: LibraryArgs<LightScanArgs>, result: null } |
|
||||
{ key: "notifications.listen", input: never, result: Notification } |
|
||||
|
|
|
@ -5,7 +5,7 @@ import type { BackendFeature } from '../core';
|
|||
import { valtioPersist } from '../lib/valito';
|
||||
import { nonLibraryClient, useBridgeQuery } from '../rspc';
|
||||
|
||||
export const features = ['spacedrop', 'p2pPairing', 'syncRoute', 'backups', 'cloud'] as const;
|
||||
export const features = ['spacedrop', 'p2pPairing', 'backups', 'debugRoutes'] as const;
|
||||
|
||||
// This defines which backend feature flags show up in the UI.
|
||||
// This is kinda a hack to not having the runtime array of possible features as Specta only exports the types.
|
||||
|
|
Loading…
Reference in a new issue