From 7bf0c0ae4a8b66c238c9c649f0d59660a3cacc29 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Sun, 27 Aug 2023 14:41:26 +0800 Subject: [PATCH] [ENG-994] Minor P2P fixes (#1238) * mdns advertisement on detection * fix bug in P2P discovery * Add `P2PEvent::ExpirePeer` * React properly * Consistent tracing versions * better logger config * Reduce excessive logging from P2P * Fix panic log hook * Remove `dbg` * Fix bug in thumbnailer remover * wip: Connected peers in UI * `Sync` is large pain * init services after library load * improve error handling of logger * Sync protocol shutdown * Add "updater" feature * fix commands bindings * nlm state debug query * nlm debug status * Make TS happy * Add `RemoteIdentity` to libraries debug info * Improve debug data presentation * Among us level sus * minor fix panic hook * Fix EOF issue * fix instance not being added on pairing --------- Co-authored-by: Utku <74243531+utkubakir@users.noreply.github.com> Co-authored-by: Ericson "Fogo" Soares --- Cargo.lock | 14 +- Cargo.toml | 6 + apps/desktop/src-tauri/Cargo.toml | 2 +- apps/desktop/src-tauri/src/main.rs | 10 +- apps/mobile/crates/android/Cargo.toml | 2 +- apps/mobile/crates/core/Cargo.toml | 2 +- apps/server/Cargo.toml | 2 +- apps/server/src/main.rs | 7 +- core/Cargo.toml | 6 +- core/src/api/libraries.rs | 9 +- core/src/api/p2p.rs | 13 +- core/src/lib.rs | 96 ++++---- core/src/library/library.rs | 5 + core/src/library/manager/mod.rs | 1 + core/src/object/thumbnail_remover.rs | 3 + core/src/p2p/p2p_manager.rs | 42 ++-- core/src/p2p/pairing/mod.rs | 1 + core/src/p2p/peer_metadata.rs | 34 ++- core/src/p2p/sync/mod.rs | 56 +++-- crates/ffmpeg/Cargo.toml | 2 +- crates/p2p/Cargo.toml | 4 +- crates/p2p/examples/basic.rs | 4 +- crates/p2p/src/discovery/mdns.rs | 41 +++- crates/p2p/src/manager.rs | 31 ++- crates/p2p/src/manager_stream.rs | 221 +++++++++++------- crates/p2p/src/spacetime/behaviour.rs | 11 +- crates/p2p/src/spacetime/connection.rs | 4 +- crates/p2p/src/spacetime/proto_inbound.rs | 8 +- crates/p2p/src/spacetunnel/identity.rs | 17 ++ crates/p2p/src/utils/metadata.rs | 8 +- .../app/$libraryId/settings/library/nodes.tsx | 80 +++++-- packages/client/src/core.ts | 9 +- packages/client/src/hooks/useP2PEvents.tsx | 19 +- 33 files changed, 503 insertions(+), 267 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c51e87b9a..f4a5f843a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7047,7 +7047,7 @@ dependencies = [ "tauri-build", "tauri-specta", "tokio", - "tracing 0.1.37", + "tracing 0.2.0", "uuid", "window-shadows", ] @@ -7087,7 +7087,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", - "tracing 0.1.37", + "tracing 0.2.0", "webp", ] @@ -7126,7 +7126,7 @@ version = "0.1.0" dependencies = [ "jni 0.19.0", "sd-mobile-core", - "tracing 0.1.37", + "tracing 0.2.0", ] [[package]] @@ -7143,7 +7143,7 @@ dependencies = [ "sd-core", "serde_json", "tokio", - "tracing 0.1.37", + "tracing 0.2.0", ] [[package]] @@ -7175,8 +7175,8 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "tracing 0.1.37", - "tracing-subscriber 0.3.17", + "tracing 0.2.0", + "tracing-subscriber 0.3.0", "uuid", ] @@ -7204,7 +7204,7 @@ dependencies = [ "sd-core", "tokio", "tower-http", - "tracing 0.1.37", + "tracing 0.2.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d28cd39be..5ef9763d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,12 @@ prisma-client-rust-sdk = { git = "https://github.com/Brendonovich/prisma-client- "sqlite", ], default-features = false } +tracing = { git = "https://github.com/tokio-rs/tracing", rev = "29146260fb4615d271d2e899ad95a753bb42915e" } # To work with tracing-appender +tracing-subscriber = { git = "https://github.com/tokio-rs/tracing", rev = "29146260fb4615d271d2e899ad95a753bb42915e", features = [ + "env-filter", +] } # To work with tracing-appender +tracing-appender = { git = "https://github.com/tokio-rs/tracing", rev = "29146260fb4615d271d2e899ad95a753bb42915e" } # Unreleased changes for rolling log deletion + rspc = { version = "0.1.4" } specta = { version = "1.0.4" } httpz = { version = "0.0.3" } diff --git a/apps/desktop/src-tauri/Cargo.toml b/apps/desktop/src-tauri/Cargo.toml index 946af0168..1e6e39ce4 100644 --- a/apps/desktop/src-tauri/Cargo.toml +++ b/apps/desktop/src-tauri/Cargo.toml @@ -32,7 +32,7 @@ sd-core = { path = "../../../core", features = [ ] } tokio = { workspace = true, features = ["sync"] } window-shadows = "0.2.1" -tracing = "0.1.37" +tracing = { workspace = true } serde = "1.0.163" percent-encoding = "2.2.0" http = "0.2.9" diff --git a/apps/desktop/src-tauri/src/main.rs b/apps/desktop/src-tauri/src/main.rs index 4055cffc9..9aa1e1d72 100644 --- a/apps/desktop/src-tauri/src/main.rs +++ b/apps/desktop/src-tauri/src/main.rs @@ -97,11 +97,11 @@ async fn main() -> tauri::Result<()> { #[cfg(debug_assertions)] let data_dir = data_dir.join("dev"); - // Initialize and configure app logging - // Return value must be assigned to variable for flushing remaining logs on main exit throught Drop - let _guard = Node::init_logger(&data_dir); - - let result = Node::new(data_dir).await; + // The `_guard` must be assigned to variable for flushing remaining logs on main exit through Drop + let (_guard, result) = match Node::init_logger(&data_dir) { + Ok(guard) => (Some(guard), Node::new(data_dir).await), + Err(err) => (None, Err(NodeError::Logger(err))), + }; let app = tauri::Builder::default(); diff --git a/apps/mobile/crates/android/Cargo.toml b/apps/mobile/crates/android/Cargo.toml index 88bcf978c..e4336933e 100644 --- a/apps/mobile/crates/android/Cargo.toml +++ b/apps/mobile/crates/android/Cargo.toml @@ -18,4 +18,4 @@ jni = "0.19.0" sd-mobile-core = { path = "../core" } # Other -tracing = "0.1.37" +tracing = { workspace = true } diff --git a/apps/mobile/crates/core/Cargo.toml b/apps/mobile/crates/core/Cargo.toml index d1fbe19d1..acc4d5182 100644 --- a/apps/mobile/crates/core/Cargo.toml +++ b/apps/mobile/crates/core/Cargo.toml @@ -21,6 +21,6 @@ openssl-sys = { version = "0.9.88", features = [ "vendored", ] } # Override features of transitive dependencies to support IOS Simulator on M1 futures = "0.3.28" -tracing = "0.1.37" +tracing = { workspace = true } futures-channel = "0.3.28" futures-locks = "0.7.1" diff --git a/apps/server/Cargo.toml b/apps/server/Cargo.toml index 3ee4a663a..558baafbd 100644 --- a/apps/server/Cargo.toml +++ b/apps/server/Cargo.toml @@ -18,7 +18,7 @@ rspc = { workspace = true, features = ["axum"] } httpz = { workspace = true, features = ["axum"] } axum = "0.6.18" tokio = { workspace = true, features = ["sync", "rt-multi-thread", "signal"] } -tracing = "0.1.37" +tracing = { workspace = true } ctrlc = "3.3.1" http = "0.2.9" tower-http = { version = "0.4.0", features = ["fs"] } diff --git a/apps/server/src/main.rs b/apps/server/src/main.rs index ced5e3579..15db3efbc 100644 --- a/apps/server/src/main.rs +++ b/apps/server/src/main.rs @@ -32,7 +32,12 @@ async fn main() { .map(|port| port.parse::().unwrap_or(8080)) .unwrap_or(8080); - let _guard = Node::init_logger(&data_dir); + let _guard = match Node::init_logger(&data_dir) { + Ok(guard) => guard, + Err(e) => { + panic!("{}", e.to_string()) + } + }; let (node, router) = match Node::new(data_dir).await { Ok(d) => d, diff --git a/core/Cargo.toml b/core/Cargo.toml index b028e512c..f25ecdb8d 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -70,8 +70,8 @@ include_dir = { version = "0.7.3", features = ["glob"] } async-trait = "^0.1.68" image = "0.24.6" webp = "0.2.2" -tracing = { git = "https://github.com/tokio-rs/tracing", rev = "29146260fb4615d271d2e899ad95a753bb42915e" } # To work with tracing-appender -tracing-subscriber = { git = "https://github.com/tokio-rs/tracing", rev = "29146260fb4615d271d2e899ad95a753bb42915e", features = [ +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = [ "env-filter", ] } async-stream = "0.3.5" @@ -90,7 +90,7 @@ notify = { version = "5.2.0", default-features = false, features = [ static_assertions = "1.1.0" serde-hashkey = "0.4.5" normpath = { version = "1.1.1", features = ["localization"] } -tracing-appender = { git = "https://github.com/tokio-rs/tracing", rev = "29146260fb4615d271d2e899ad95a753bb42915e" } # Unreleased changes for log deletion +tracing-appender = { workspace = true } strum = { version = "0.24", features = ["derive"] } strum_macros = "0.24" regex = "1.8.4" diff --git a/core/src/api/libraries.rs b/core/src/api/libraries.rs index d8bce3d6a..0cd3e559b 100644 --- a/core/src/api/libraries.rs +++ b/core/src/api/libraries.rs @@ -6,6 +6,7 @@ use crate::{ use chrono::Utc; use rspc::alpha::AlphaRouter; +use sd_p2p::spacetunnel::RemoteIdentity; use sd_prisma::prisma::statistics; use serde::{Deserialize, Serialize}; use specta::Type; @@ -18,9 +19,11 @@ use super::{ }; // TODO(@Oscar): Replace with `specta::json` -#[derive(Serialize, Deserialize, Type)] +#[derive(Serialize, Type)] pub struct LibraryConfigWrapped { pub uuid: Uuid, + pub instance_id: Uuid, + pub instance_public_key: RemoteIdentity, pub config: LibraryConfig, } @@ -34,6 +37,8 @@ pub(crate) fn mount() -> AlphaRouter { .into_iter() .map(|lib| LibraryConfigWrapped { uuid: lib.id, + instance_id: lib.instance_uuid, + instance_public_key: lib.identity.to_remote_identity(), config: lib.config.clone(), }) .collect::>() @@ -114,6 +119,8 @@ pub(crate) fn mount() -> AlphaRouter { Ok(LibraryConfigWrapped { uuid: library.id, + instance_id: library.instance_uuid, + instance_public_key: library.identity.to_remote_identity(), config: library.config.clone(), }) }) diff --git a/core/src/api/p2p.rs b/core/src/api/p2p.rs index 01e4087c2..f1a98ed94 100644 --- a/core/src/api/p2p.rs +++ b/core/src/api/p2p.rs @@ -23,11 +23,13 @@ pub(crate) fn mount() -> AlphaRouter { }; } - // // TODO: Don't block subscription start - // for peer in ctx.p2p_manager.get_connected_peers().await.unwrap() { - // // TODO: Send to frontend - // } + // TODO: Don't block subscription start + for peer_id in node.p2p.manager.get_connected_peers().await.unwrap() { + yield P2PEvent::ConnectedPeer { + peer_id, + }; + } while let Ok(event) = rx.recv().await { yield event; @@ -35,6 +37,9 @@ pub(crate) fn mount() -> AlphaRouter { } }) }) + .procedure("nlmState", { + R.query(|node, _: ()| async move { node.nlm.state().await }) + }) .procedure("spacedrop", { #[derive(Type, Deserialize)] pub struct SpacedropArgs { diff --git a/core/src/lib.rs b/core/src/lib.rs index e502068e9..4b1b96310 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -14,7 +14,7 @@ use notifications::Notifications; pub use sd_prisma::*; use std::{ - fmt, + env, fmt, path::{Path, PathBuf}, sync::Arc, }; @@ -26,7 +26,12 @@ use tracing_appender::{ non_blocking::{NonBlocking, WorkerGuard}, rolling::{RollingFileAppender, Rotation}, }; -use tracing_subscriber::{fmt as tracing_fmt, prelude::*, EnvFilter}; +use tracing_subscriber::{ + filter::{Directive, FromEnvError, LevelFilter}, + fmt as tracing_fmt, + prelude::*, + EnvFilter, +}; pub mod api; pub mod custom_uri; @@ -111,19 +116,20 @@ impl Node { init_data.apply(&node.libraries, &node).await?; } + // Finally load the libraries from disk into the library manager + node.libraries.init(&node).await?; + + // It's import these are run after libraries are loaded! locations_actor.start(node.clone()); jobs_actor.start(node.clone()); node.p2p.start(p2p_stream, node.clone()); - // Finally load the libraries from disk into the library manager - node.libraries.init(&node).await?; - let router = api::mount(); info!("Spacedrive online."); Ok((node, router)) } - pub fn init_logger(data_dir: impl AsRef) -> WorkerGuard { + pub fn init_logger(data_dir: impl AsRef) -> Result { let (logfile, guard) = NonBlocking::new( RollingFileAppender::builder() .filename_prefix("sd.log") @@ -133,6 +139,17 @@ impl Node { .expect("Error setting up log file!"), ); + // Set a default if the user hasn't set an override + if env::var("RUST_LOG") == Err(env::VarError::NotPresent) { + let directive: Directive = if cfg!(debug_assertions) { + LevelFilter::DEBUG + } else { + LevelFilter::INFO + } + .into(); + env::set_var("RUST_LOG", directive.to_string()); + } + let collector = tracing_subscriber::registry() .with( tracing_fmt::Subscriber::new() @@ -142,49 +159,12 @@ impl Node { .with( tracing_fmt::Subscriber::new() .with_writer(std::io::stdout) - .with_filter(if cfg!(debug_assertions) { - EnvFilter::from_default_env() - .add_directive( - "warn".parse().expect("Error invalid tracing directive!"), - ) - .add_directive( - "sd_core=debug" - .parse() - .expect("Error invalid tracing directive!"), - ) - .add_directive( - "sd_core::location::manager=info" - .parse() - .expect("Error invalid tracing directive!"), - ) - .add_directive( - "sd_core_mobile=debug" - .parse() - .expect("Error invalid tracing directive!"), - ) - // .add_directive( - // "sd_p2p=debug" - // .parse() - // .expect("Error invalid tracing directive!"), - // ) - .add_directive( - "server=debug" - .parse() - .expect("Error invalid tracing directive!"), - ) - .add_directive( - "spacedrive=debug" - .parse() - .expect("Error invalid tracing directive!"), - ) - .add_directive( - "rspc=debug" - .parse() - .expect("Error invalid tracing directive!"), - ) - } else { - EnvFilter::from("info") - }), + .with_filter( + EnvFilter::builder() + .from_env()? + // We don't wanna blow up the logs + .add_directive("sd_core::location::manager=info".parse()?), + ), ); tracing::collect::set_global_default(collector) @@ -193,13 +173,19 @@ impl Node { }) .ok(); - let prev_hook = std::panic::take_hook(); - std::panic::set_hook(Box::new(move |panic_info| { - error!("{}", panic_info); - prev_hook(panic_info); + std::panic::set_hook(Box::new(move |panic| { + if let Some(location) = panic.location() { + tracing::error!( + message = %panic, + panic.file = format!("{}:{}", location.file(), location.line()), + panic.column = location.column(), + ); + } else { + tracing::error!(message = %panic); + } })); - guard + Ok(guard) } pub async fn shutdown(&self) { @@ -254,4 +240,6 @@ pub enum NodeError { #[cfg(debug_assertions)] #[error("Init config error: {0}")] InitConfig(#[from] util::debug_initializer::InitConfigError), + #[error("logger error: {0}")] + Logger(#[from] FromEnvError), } diff --git a/core/src/library/library.rs b/core/src/library/library.rs index 8a89e5749..8ae20a234 100644 --- a/core/src/library/library.rs +++ b/core/src/library/library.rs @@ -49,6 +49,8 @@ pub struct Library { /// p2p identity pub identity: Arc, pub orphan_remover: OrphanRemoverActor, + // The UUID which matches `config.instance_id`'s primary key. + pub instance_uuid: Uuid, notifications: notifications::Notifications, @@ -63,6 +65,7 @@ impl Debug for Library { // troublesome to implement Debug trait f.debug_struct("LibraryContext") .field("id", &self.id) + .field("instance_uuid", &self.instance_uuid) .field("config", &self.config) .field("db", &self.db) .finish() @@ -73,6 +76,7 @@ impl Library { pub async fn new( id: Uuid, config: LibraryConfig, + instance_uuid: Uuid, identity: Arc, db: Arc, node: &Arc, @@ -87,6 +91,7 @@ impl Library { identity: identity.clone(), orphan_remover: OrphanRemoverActor::spawn(db), notifications: node.notifications.clone(), + instance_uuid, event_bus_tx: node.event_bus.0.clone(), }) } diff --git a/core/src/library/manager/mod.rs b/core/src/library/manager/mod.rs index 56e89cbee..0db3262d0 100644 --- a/core/src/library/manager/mod.rs +++ b/core/src/library/manager/mod.rs @@ -384,6 +384,7 @@ impl Libraries { let library = Library::new( id, config, + instance_id, identity, // key_manager, db, diff --git a/core/src/object/thumbnail_remover.rs b/core/src/object/thumbnail_remover.rs index ae6ce153c..9a69289fb 100644 --- a/core/src/object/thumbnail_remover.rs +++ b/core/src/object/thumbnail_remover.rs @@ -250,6 +250,9 @@ impl Actor { //└── [0..2]/ # sharding // └── .webp + fs::create_dir_all(&thumbnails_directory) + .await + .map_err(|e| FileIOError::from((thumbnails_directory, e)))?; let mut read_dir = fs::read_dir(thumbnails_directory) .await .map_err(|e| FileIOError::from((thumbnails_directory, e)))?; diff --git a/core/src/p2p/p2p_manager.rs b/core/src/p2p/p2p_manager.rs index 6ab4d73a1..334ddc81e 100644 --- a/core/src/p2p/p2p_manager.rs +++ b/core/src/p2p/p2p_manager.rs @@ -45,6 +45,15 @@ pub enum P2PEvent { peer_id: PeerId, metadata: PeerMetadata, }, + ExpiredPeer { + peer_id: PeerId, + }, + ConnectedPeer { + peer_id: PeerId, + }, + DisconnectedPeer { + peer_id: PeerId, + }, SpacedropRequest { id: Uuid, peer_id: PeerId, @@ -108,10 +117,6 @@ impl P2PManager { let pairing = PairingManager::new(manager.clone(), tx.clone(), metadata_manager.clone()); - // TODO: proper shutdown - // https://docs.rs/ctrlc/latest/ctrlc/ - // https://docs.rs/system_shutdown/latest/system_shutdown/ - Ok(( Arc::new(Self { pairing, @@ -141,11 +146,6 @@ impl P2PManager { while let Some(event) = stream.next().await { match event { Event::PeerDiscovered(event) => { - debug!( - "Discovered peer by id '{}' with address '{:?}' and metadata: {:?}", - event.peer_id, event.addresses, event.metadata - ); - events .send(P2PEvent::DiscoveredPeer { peer_id: event.peer_id, @@ -156,12 +156,22 @@ impl P2PManager { node.nlm.peer_discovered(event).await; } - Event::PeerExpired { id, metadata } => { - debug!("Peer '{}' expired with metadata: {:?}", id, metadata); + Event::PeerExpired { id, .. } => { + events + .send(P2PEvent::ExpiredPeer { peer_id: id }) + .map_err(|_| error!("Failed to send event to p2p event stream!")) + .ok(); + node.nlm.peer_expired(id).await; } Event::PeerConnected(event) => { - debug!("Peer '{}' connected", event.peer_id); + events + .send(P2PEvent::ConnectedPeer { + peer_id: event.peer_id, + }) + .map_err(|_| error!("Failed to send event to p2p event stream!")) + .ok(); + node.nlm.peer_connected(event.peer_id).await; if event.establisher { @@ -175,7 +185,11 @@ impl P2PManager { } } Event::PeerDisconnected(peer_id) => { - debug!("Peer '{}' disconnected", peer_id); + events + .send(P2PEvent::DisconnectedPeer { peer_id }) + .map_err(|_| error!("Failed to send event to p2p event stream!")) + .ok(); + node.nlm.peer_disconnected(peer_id).await; } Event::PeerMessage(event) => { @@ -297,7 +311,7 @@ impl P2PManager { shutdown = true; break; } - _ => debug!("event: {:?}", event), + _ => {} } } diff --git a/core/src/p2p/pairing/mod.rs b/core/src/p2p/pairing/mod.rs index 456b22bac..96ccd285d 100644 --- a/core/src/p2p/pairing/mod.rs +++ b/core/src/p2p/pairing/mod.rs @@ -286,6 +286,7 @@ impl PairingManager { .exec() .await .unwrap(); + library_manager.update_instances(library.clone()).await; stream .write_all( diff --git a/core/src/p2p/peer_metadata.rs b/core/src/p2p/peer_metadata.rs index b19deb689..a4760820e 100644 --- a/core/src/p2p/peer_metadata.rs +++ b/core/src/p2p/peer_metadata.rs @@ -1,9 +1,10 @@ use std::{collections::HashMap, env, str::FromStr}; use itertools::Itertools; -use sd_p2p::{spacetunnel::RemoteIdentity, Metadata}; +use sd_p2p::{spacetunnel::RemoteIdentity, Metadata, PeerId}; use serde::{Deserialize, Serialize}; use specta::Type; +use tracing::warn; use crate::node::Platform; @@ -54,7 +55,7 @@ impl Metadata for PeerMetadata { map } - fn from_hashmap(data: &HashMap) -> Result + fn from_hashmap(peer_id: &PeerId, data: &HashMap) -> Result where Self: Sized, { @@ -81,20 +82,31 @@ impl Metadata for PeerMetadata { i += 1; } - if instances.is_empty() { - return Err("DNS record for field 'instances' missing. Unable to decode 'PeerMetadata'!" - .to_owned()); - } - instances .split(',') - .map(|s| { + .filter_map(|s| { + // "".split(",").collect::>() == [""] + if s.is_empty() { + return None; + } + RemoteIdentity::from_bytes( - &hex::decode(s).map_err(|_| "Unable to decode instance!")?, + &hex::decode(s) + .map_err(|e| { + warn!( + "Unable to parse instance from peer '{peer_id}'s metadata!" + ); + e + }) + .ok()?, ) - .map_err(|_| "Unable to parse instance!") + .map_err(|e| { + warn!("Unable to parse instance from peer '{peer_id}'s metadata!"); + e + }) + .ok() }) - .collect::, _>>()? + .collect::>() }, }) } diff --git a/core/src/p2p/sync/mod.rs b/core/src/p2p/sync/mod.rs index f009ee444..736c231fa 100644 --- a/core/src/p2p/sync/mod.rs +++ b/core/src/p2p/sync/mod.rs @@ -7,6 +7,8 @@ use sd_p2p::{ DiscoveredPeer, PeerId, }; use sd_sync::CRDTOperation; +use serde::Serialize; +use specta::Type; use sync::GetOpsArgs; use tokio::{ @@ -26,17 +28,20 @@ use super::{Header, IdentityOrRemoteIdentity, P2PManager, PeerMetadata}; mod proto; pub use proto::*; -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Serialize, Type)] pub enum InstanceState { Unavailable, Discovered(PeerId), Connected(PeerId), } +#[derive(Debug, Clone, Serialize, Type)] pub struct LibraryData { instances: HashMap, } +type LibrariesMap = HashMap; + pub struct NetworkedLibraries { p2p: Arc, pub(crate) libraries: RwLock>, @@ -112,11 +117,11 @@ impl NetworkedLibraries { .filter_map(|i| { // TODO: Error handling match IdentityOrRemoteIdentity::from_bytes(&i.identity).unwrap() { - IdentityOrRemoteIdentity::Identity(identity) => { - Some((identity.to_remote_identity(), InstanceState::Unavailable)) - } // We don't own it so don't advertise it - IdentityOrRemoteIdentity::RemoteIdentity(_) => None, + IdentityOrRemoteIdentity::Identity(_) => None, + IdentityOrRemoteIdentity::RemoteIdentity(identity) => { + Some((identity, InstanceState::Unavailable)) + } } }) .collect(), @@ -212,6 +217,10 @@ impl NetworkedLibraries { } } } + + pub async fn state(&self) -> LibrariesMap { + self.libraries.read().await.clone() + } } // These functions could be moved to some separate protocol abstraction @@ -291,7 +300,7 @@ mod originator { .unwrap(); tunnel.flush().await.unwrap(); - while let Ok(rx::GetOperations(args)) = + while let Ok(rx::GetOperations::Operations(args)) = rx::GetOperations::from_stream(&mut tunnel).await { let ops = sync.get_ops(args).await.unwrap(); @@ -318,28 +327,31 @@ mod responder { use originator::tx as rx; pub mod tx { + use serde::{Deserialize, Serialize}; + use super::*; - pub struct GetOperations(pub GetOpsArgs); + #[derive(Serialize, Deserialize)] + pub enum GetOperations { + Operations(GetOpsArgs), + Done, + } impl GetOperations { // TODO: Per field errors for better error handling pub async fn from_stream( stream: &mut (impl AsyncRead + Unpin), ) -> std::io::Result { - Ok(Self( + Ok( // TODO: Error handling rmp_serde::from_slice(&decode::buf(stream).await.unwrap()).unwrap(), - )) + ) } pub fn to_bytes(&self) -> Vec { - let Self(ops) = self; - let mut buf = vec![]; - // TODO: Error handling - encode::buf(&mut buf, &rmp_serde::to_vec_named(&ops).unwrap()); + encode::buf(&mut buf, &rmp_serde::to_vec_named(&self).unwrap()); buf } } @@ -349,6 +361,16 @@ mod responder { let ingest = &library.sync.ingest; let Ok(mut rx) = ingest.req_rx.try_lock() else { + println!("Rejected sync due to libraries lock being held!"); + + // TODO: Proper error returned to remote instead of this. + // TODO: We can't just abort the connection when the remote is expecting data. + tunnel + .write_all(&tx::GetOperations::Done.to_bytes()) + .await + .unwrap(); + tunnel.flush().await.unwrap(); + return; }; @@ -369,7 +391,7 @@ mod responder { tunnel .write_all( - &tx::GetOperations(sync::GetOpsArgs { + &tx::GetOperations::Operations(sync::GetOpsArgs { clocks: timestamps, count: OPS_PER_REQUEST, }) @@ -391,5 +413,11 @@ mod responder { .await .expect("TODO: Handle ingest channel closed, so we don't loose ops"); } + + tunnel + .write_all(&tx::GetOperations::Done.to_bytes()) + .await + .unwrap(); + tunnel.flush().await.unwrap(); } } diff --git a/crates/ffmpeg/Cargo.toml b/crates/ffmpeg/Cargo.toml index 0099ae093..a09f539fb 100644 --- a/crates/ffmpeg/Cargo.toml +++ b/crates/ffmpeg/Cargo.toml @@ -11,7 +11,7 @@ edition = { workspace = true } [dependencies] ffmpeg-sys-next = "6.0.1" -tracing = "0.1.37" +tracing = { workspace = true } thiserror = "1.0.40" webp = "0.2.2" diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml index 06578204b..f5ee54a86 100644 --- a/crates/p2p/Cargo.toml +++ b/crates/p2p/Cargo.toml @@ -27,7 +27,7 @@ if-watch = { version = "3.0.1", features = [ ] } # Override the features of if-watch which is used by libp2p-quic mdns-sd = "0.6.1" thiserror = "1.0.40" -tracing = "0.1.37" +tracing = { workspace = true } serde = { version = "1.0.163", features = ["derive"] } rmp-serde = "1.1.1" specta = { workspace = true } @@ -41,4 +41,4 @@ hex = "0.4.3" [dev-dependencies] tokio = { workspace = true, features = ["rt-multi-thread"] } -tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } +tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/crates/p2p/examples/basic.rs b/crates/p2p/examples/basic.rs index 28f572357..35c286162 100644 --- a/crates/p2p/examples/basic.rs +++ b/crates/p2p/examples/basic.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, env, time::Duration}; -use sd_p2p::{Event, Keypair, Manager, Metadata, MetadataManager}; +use sd_p2p::{Event, Keypair, Manager, Metadata, MetadataManager, PeerId}; use tokio::{io::AsyncReadExt, time::sleep}; use tracing::{debug, error, info}; @@ -14,7 +14,7 @@ impl Metadata for PeerMetadata { HashMap::from([("name".to_owned(), self.name)]) } - fn from_hashmap(data: &HashMap) -> Result + fn from_hashmap(_: &PeerId, data: &HashMap) -> Result where Self: Sized, { diff --git a/crates/p2p/src/discovery/mdns.rs b/crates/p2p/src/discovery/mdns.rs index 32149ba77..543cc5f9b 100644 --- a/crates/p2p/src/discovery/mdns.rs +++ b/crates/p2p/src/discovery/mdns.rs @@ -12,7 +12,7 @@ use tokio::{ sync::{mpsc, RwLock}, time::{sleep_until, Instant, Sleep}, }; -use tracing::{error, info, warn}; +use tracing::{debug, error, trace, warn}; use crate::{DiscoveredPeer, Event, Manager, Metadata, MetadataManager, PeerId}; @@ -38,6 +38,7 @@ where mdns_service_receiver: flume::Receiver, service_name: String, next_mdns_advertisement: Pin>, + next_allowed_discovery_advertisement: Instant, trigger_advertisement: mpsc::UnboundedReceiver<()>, pub(crate) state: Arc>, } @@ -70,6 +71,7 @@ where mdns_service_receiver, service_name, next_mdns_advertisement: Box::pin(sleep_until(Instant::now())), // Trigger an advertisement immediately + next_allowed_discovery_advertisement: Instant::now(), trigger_advertisement: advertise_rx, state: state.clone(), }, @@ -84,6 +86,13 @@ where /// Do an mdns advertisement to the network. async fn advertise(&mut self) { + self.inner_advertise().await; + + self.next_mdns_advertisement = + Box::pin(sleep_until(Instant::now() + MDNS_READVERTISEMENT_INTERVAL)); + } + + async fn inner_advertise(&self) { let metadata = self.metadata_manager.get().to_hashmap(); // This is in simple terms converts from `Vec<(ip, port)>` to `Vec<(Vec, port)>` @@ -119,15 +128,12 @@ where } for (_, service) in services.into_iter() { - info!("advertising mdns service: {:?}", service); + trace!("advertising mdns service: {:?}", service); match self.mdns_daemon.register(service) { Ok(_) => {} Err(err) => warn!("error registering mdns service: {}", err), } } - - self.next_mdns_advertisement = - Box::pin(sleep_until(Instant::now() + MDNS_READVERTISEMENT_INTERVAL)); } // TODO: if the channel's sender is dropped will this cause the `tokio::select` in the `manager.rs` to infinitely loop? @@ -153,6 +159,7 @@ where } match TMetadata::from_hashmap( + &peer_id, &info .get_properties() .iter() @@ -165,9 +172,19 @@ where self.state.discovered.write().await; let peer = if let Some(peer) = discovered_peers.remove(&peer_id) { - peer } else { + // Found a new peer, let's readvertise our mdns service as it may have just come online + // `self.last_discovery_advertisement` is to prevent DOS-style attacks. + let now = Instant::now(); + if self.next_allowed_discovery_advertisement <= now { + self.next_allowed_discovery_advertisement = now + Duration::from_secs(1); + + self.inner_advertise().await; + self.next_mdns_advertisement = + Box::pin(sleep_until(Instant::now() + MDNS_READVERTISEMENT_INTERVAL)); + } + DiscoveredPeer { manager: manager.clone(), peer_id, @@ -188,11 +205,13 @@ where discovered_peers.insert(peer_id, peer.clone()); peer }; + debug!( + "Discovered peer by id '{}' with address '{:?}' and metadata: {:?}", + peer.peer_id, peer.addresses, peer.metadata + ); return Some(Event::PeerDiscovered(peer)); } - Err(err) => { - error!("error parsing metadata for peer '{}': {}", raw_peer_id, err) - } + Err(err) => error!("error parsing metadata for peer '{}': {}", raw_peer_id, err) } } Err(_) => warn!( @@ -216,9 +235,11 @@ where self.state.discovered.write().await; let peer = discovered_peers.remove(&peer_id); + let metadata = peer.map(|p| p.metadata); + debug!("Peer '{peer_id}' expired with metadata: {metadata:?}"); return Some(Event::PeerExpired { id: peer_id, - metadata: peer.map(|p| p.metadata), + metadata, }); } } diff --git a/crates/p2p/src/manager.rs b/crates/p2p/src/manager.rs index c44519002..c128532d7 100644 --- a/crates/p2p/src/manager.rs +++ b/crates/p2p/src/manager.rs @@ -10,12 +10,12 @@ use std::{ use libp2p::{core::muxing::StreamMuxerBox, swarm::SwarmBuilder, Transport}; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; -use tracing::{debug, error, warn}; +use tracing::{error, trace, warn}; use crate::{ spacetime::{SpaceTime, UnicastStream}, - DiscoveredPeer, Keypair, ManagerStream, ManagerStreamAction, Mdns, MdnsState, Metadata, - MetadataManager, PeerId, + DiscoveredPeer, Keypair, ManagerStream, ManagerStreamAction, ManagerStreamAction2, Mdns, + MdnsState, Metadata, MetadataManager, PeerId, }; /// Is the core component of the P2P system that holds the state and delegates actions to the other components @@ -25,7 +25,8 @@ pub struct Manager { pub(crate) peer_id: PeerId, pub(crate) application_name: &'static [u8], pub(crate) stream_id: AtomicU64, - event_stream_tx: mpsc::Sender>, + event_stream_tx: mpsc::Sender, + event_stream_tx2: mpsc::Sender>, } impl Manager { @@ -42,7 +43,8 @@ impl Manager { .ok_or(ManagerError::InvalidAppName)?; let peer_id = PeerId(keypair.raw_peer_id()); - let (event_stream_tx, event_stream_rx) = mpsc::channel(1024); + let (event_stream_tx, event_stream_rx) = mpsc::channel(128); + let (event_stream_tx2, event_stream_rx2) = mpsc::channel(128); let (mdns, mdns_state) = Mdns::new(application_name, peer_id, metadata_manager) .await @@ -58,6 +60,7 @@ impl Manager { stream_id: AtomicU64::new(0), peer_id, event_stream_tx, + event_stream_tx2, }); let mut swarm = SwarmBuilder::with_tokio_executor( @@ -74,13 +77,13 @@ impl Manager { let listener_id = swarm .listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse().expect("Error passing libp2p multiaddr. This value is hardcoded so this should be impossible.")) .unwrap(); - debug!("created ipv4 listener with id '{:?}'", listener_id); + trace!("created ipv4 listener with id '{:?}'", listener_id); } { let listener_id = swarm .listen_on("/ip6/::/udp/0/quic-v1".parse().expect("Error passing libp2p multiaddr. This value is hardcoded so this should be impossible.")) .unwrap(); - debug!("created ipv4 listener with id '{:?}'", listener_id); + trace!("created ipv4 listener with id '{:?}'", listener_id); } Ok(( @@ -88,6 +91,7 @@ impl Manager { ManagerStream { manager: this, event_stream_rx, + event_stream_rx2, swarm, mdns, queued_events: Default::default(), @@ -97,7 +101,7 @@ impl Manager { )) } - pub(crate) async fn emit(&self, event: ManagerStreamAction) { + pub(crate) async fn emit(&self, event: ManagerStreamAction) { match self.event_stream_tx.send(event).await { Ok(_) => {} Err(err) => warn!("error emitting event: {}", err), @@ -131,12 +135,19 @@ impl Manager { } // TODO: Does this need any timeouts to be added cause hanging forever is bad? + // be aware this method is `!Sync` so can't be used from rspc. // TODO: Can this limitation be removed? #[allow(clippy::unused_unit)] // TODO: Remove this clippy override once error handling is added pub async fn stream(&self, peer_id: PeerId) -> Result { // TODO: With this system you can send to any random peer id. Can I reduce that by requiring `.connect(peer_id).unwrap().send(data)` or something like that. let (tx, rx) = oneshot::channel(); - self.emit(ManagerStreamAction::StartStream(peer_id, tx)) - .await; + match self + .event_stream_tx2 + .send(ManagerStreamAction2::StartStream(peer_id, tx)) + .await + { + Ok(_) => {} + Err(err) => warn!("error emitting event: {}", err), + } let mut stream = rx.await.map_err(|_| { warn!("failed to queue establishing stream to peer '{peer_id}'!"); diff --git a/crates/p2p/src/manager_stream.rs b/crates/p2p/src/manager_stream.rs index 2dc182295..e405c0fa6 100644 --- a/crates/p2p/src/manager_stream.rs +++ b/crates/p2p/src/manager_stream.rs @@ -17,7 +17,7 @@ use libp2p::{ Swarm, }; use tokio::sync::{mpsc, oneshot}; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, trace, warn}; use crate::{ quic_multiaddr_to_socketaddr, socketaddr_to_quic_multiaddr, @@ -26,9 +26,9 @@ use crate::{ }; /// TODO -pub enum ManagerStreamAction { - /// Events are returned to the application via the `ManagerStream::next` method. - Event(Event), +/// +/// This is `Sync` so it can be used from within rspc. +pub enum ManagerStreamAction { /// TODO GetConnectedPeers(oneshot::Sender>), /// Tell the [`libp2p::Swarm`](libp2p::Swarm) to establish a new connection to a peer. @@ -37,20 +37,34 @@ pub enum ManagerStreamAction { addresses: Vec, }, /// TODO - StartStream(PeerId, oneshot::Sender), - /// TODO BroadcastData(Vec), /// the node is shutting down. The `ManagerStream` should convert this into `Event::Shutdown` Shutdown(oneshot::Sender<()>), } -impl fmt::Debug for ManagerStreamAction { +/// TODO: Get ride of this and merge into `ManagerStreamAction` without breaking rspc procedures +/// +/// This is `!Sync` so can't be used from within rspc. +pub enum ManagerStreamAction2 { + /// Events are returned to the application via the `ManagerStream::next` method. + Event(Event), + /// TODO + StartStream(PeerId, oneshot::Sender), +} + +impl fmt::Debug for ManagerStreamAction { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("ManagerStreamAction") } } -impl From> for ManagerStreamAction { +impl fmt::Debug for ManagerStreamAction2 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("ManagerStreamAction2") + } +} + +impl From> for ManagerStreamAction2 { fn from(event: Event) -> Self { Self::Event(event) } @@ -59,7 +73,8 @@ impl From> for ManagerStreamAction { pub(crate) manager: Arc>, - pub(crate) event_stream_rx: mpsc::Receiver>, + pub(crate) event_stream_rx: mpsc::Receiver, + pub(crate) event_stream_rx2: mpsc::Receiver>, pub(crate) swarm: Swarm>, pub(crate) mdns: Mdns, pub(crate) queued_events: VecDeque>, @@ -67,6 +82,25 @@ pub struct ManagerStream { pub(crate) on_establish_streams: HashMap>, } +enum EitherManagerStreamAction { + A(ManagerStreamAction), + B(ManagerStreamAction2), +} + +impl From for EitherManagerStreamAction { + fn from(event: ManagerStreamAction) -> Self { + Self::A(event) + } +} + +impl From> + for EitherManagerStreamAction +{ + fn from(event: ManagerStreamAction2) -> Self { + Self::B(event) + } +} + impl ManagerStream where TMetadata: Metadata, @@ -92,14 +126,20 @@ where }, event = self.event_stream_rx.recv() => { // If the sender has shut down we return `None` to also shut down too. - if let Some(event) = self.handle_manager_stream_action(event?).await { + if let Some(event) = self.handle_manager_stream_action(event?.into()).await { + return Some(event); + } + } + event = self.event_stream_rx2.recv() => { + // If the sender has shut down we return `None` to also shut down too. + if let Some(event) = self.handle_manager_stream_action(event?.into()).await { return Some(event); } } event = self.swarm.select_next_some() => { match event { SwarmEvent::Behaviour(event) => { - if let Some(event) = self.handle_manager_stream_action(event).await { + if let Some(event) = self.handle_manager_stream_action(event.into()).await { if let Event::Shutdown { .. } = event { self.shutdown.store(true, Ordering::Relaxed); } @@ -128,7 +168,7 @@ where SwarmEvent::NewListenAddr { address, .. } => { match quic_multiaddr_to_socketaddr(address) { Ok(addr) => { - debug!("listen address added: {}", addr); + trace!("listen address added: {}", addr); self.mdns.register_addr(addr).await; return Some(Event::AddListenAddr(addr)); }, @@ -141,7 +181,7 @@ where SwarmEvent::ExpiredListenAddr { address, .. } => { match quic_multiaddr_to_socketaddr(address) { Ok(addr) => { - debug!("listen address added: {}", addr); + trace!("listen address expired: {}", addr); self.mdns.unregister_addr(&addr).await; return Some(Event::RemoveListenAddr(addr)); }, @@ -152,11 +192,11 @@ where } } SwarmEvent::ListenerClosed { listener_id, addresses, reason } => { - debug!("listener '{:?}' was closed due to: {:?}", listener_id, reason); + trace!("listener '{:?}' was closed due to: {:?}", listener_id, reason); for address in addresses { match quic_multiaddr_to_socketaddr(address) { Ok(addr) => { - debug!("listen address added: {}", addr); + trace!("listen address closed: {}", addr); self.mdns.unregister_addr(&addr).await; self.queued_events.push_back(Event::RemoveListenAddr(addr)); @@ -182,50 +222,24 @@ where async fn handle_manager_stream_action( &mut self, - event: ManagerStreamAction, + event: EitherManagerStreamAction, ) -> Option> { match event { - ManagerStreamAction::Event(event) => return Some(event), - ManagerStreamAction::GetConnectedPeers(response) => { - response - .send( - self.swarm - .connected_peers() - .map(|v| PeerId(*v)) - .collect::>(), - ) - .map_err(|_| { - error!("Error sending response to `GetConnectedPeers` request! Sending was dropped!") - }) - .ok(); - } - ManagerStreamAction::Dial { peer_id, addresses } => { - match self.swarm.dial( - DialOpts::peer_id(peer_id.0) - .condition(PeerCondition::Disconnected) - .addresses(addresses.iter().map(socketaddr_to_quic_multiaddr).collect()) - .build(), - ) { - Ok(()) => {} - Err(err) => warn!( - "error dialing peer '{}' with addresses '{:?}': {}", - peer_id, addresses, err - ), + EitherManagerStreamAction::A(event) => match event { + ManagerStreamAction::GetConnectedPeers(response) => { + response + .send( + self.swarm + .connected_peers() + .map(|v| PeerId(*v)) + .collect::>(), + ) + .map_err(|_| { + error!("Error sending response to `GetConnectedPeers` request! Sending was dropped!") + }) + .ok(); } - } - ManagerStreamAction::StartStream(peer_id, tx) => { - if !self.swarm.connected_peers().any(|v| *v == peer_id.0) { - let addresses = self - .mdns - .state - .discovered - .read() - .await - .get(&peer_id) - .unwrap() - .addresses - .clone(); - + ManagerStreamAction::Dial { peer_id, addresses } => { match self.swarm.dial( DialOpts::peer_id(peer_id.0) .condition(PeerCondition::Disconnected) @@ -238,43 +252,74 @@ where peer_id, addresses, err ), } - - self.on_establish_streams - .entry(peer_id.0) - .or_default() - .push(OutboundRequest::Unicast(tx)); - } else { - self.swarm - .behaviour_mut() - .pending_events - .push_back(ToSwarm::NotifyHandler { - peer_id: peer_id.0, + } + ManagerStreamAction::BroadcastData(data) => { + let connected_peers = self.swarm.connected_peers().copied().collect::>(); + let behaviour = self.swarm.behaviour_mut(); + debug!("Broadcasting message to '{:?}'", connected_peers); + for peer_id in connected_peers { + behaviour.pending_events.push_back(ToSwarm::NotifyHandler { + peer_id, handler: NotifyHandler::Any, - event: OutboundRequest::Unicast(tx), + event: OutboundRequest::Broadcast(data.clone()), }); + } } - } - ManagerStreamAction::BroadcastData(data) => { - let connected_peers = self.swarm.connected_peers().copied().collect::>(); - let behaviour = self.swarm.behaviour_mut(); - debug!("Broadcasting message to '{:?}'", connected_peers); - for peer_id in connected_peers { - behaviour.pending_events.push_back(ToSwarm::NotifyHandler { - peer_id, - handler: NotifyHandler::Any, - event: OutboundRequest::Broadcast(data.clone()), + ManagerStreamAction::Shutdown(tx) => { + info!("Shutting down P2P Manager..."); + self.mdns.shutdown().await; + tx.send(()).unwrap_or_else(|_| { + warn!("Error sending shutdown signal to P2P Manager!"); }); - } - } - ManagerStreamAction::Shutdown(tx) => { - info!("Shutting down P2P Manager..."); - self.mdns.shutdown().await; - tx.send(()).unwrap_or_else(|_| { - warn!("Error sending shutdown signal to P2P Manager!"); - }); - return Some(Event::Shutdown); - } + return Some(Event::Shutdown); + } + }, + EitherManagerStreamAction::B(event) => match event { + ManagerStreamAction2::Event(event) => return Some(event), + ManagerStreamAction2::StartStream(peer_id, tx) => { + if !self.swarm.connected_peers().any(|v| *v == peer_id.0) { + let addresses = self + .mdns + .state + .discovered + .read() + .await + .get(&peer_id) + .unwrap() + .addresses + .clone(); + + match self.swarm.dial( + DialOpts::peer_id(peer_id.0) + .condition(PeerCondition::Disconnected) + .addresses( + addresses.iter().map(socketaddr_to_quic_multiaddr).collect(), + ) + .build(), + ) { + Ok(()) => {} + Err(err) => warn!( + "error dialing peer '{}' with addresses '{:?}': {}", + peer_id, addresses, err + ), + } + + self.on_establish_streams + .entry(peer_id.0) + .or_default() + .push(OutboundRequest::Unicast(tx)); + } else { + self.swarm.behaviour_mut().pending_events.push_back( + ToSwarm::NotifyHandler { + peer_id: peer_id.0, + handler: NotifyHandler::Any, + event: OutboundRequest::Unicast(tx), + }, + ); + } + } + }, } None diff --git a/crates/p2p/src/spacetime/behaviour.rs b/crates/p2p/src/spacetime/behaviour.rs index b6f345675..4e0cb11c3 100644 --- a/crates/p2p/src/spacetime/behaviour.rs +++ b/crates/p2p/src/spacetime/behaviour.rs @@ -16,7 +16,7 @@ use libp2p::{ use thiserror::Error; use tracing::debug; -use crate::{ConnectedPeer, Event, Manager, ManagerStreamAction, Metadata, PeerId}; +use crate::{ConnectedPeer, Event, Manager, ManagerStreamAction2, Metadata, PeerId}; use super::SpaceTimeConnection; @@ -53,7 +53,7 @@ impl SpaceTime { impl NetworkBehaviour for SpaceTime { type ConnectionHandler = SpaceTimeConnection; - type OutEvent = ManagerStreamAction; + type OutEvent = ManagerStreamAction2; fn handle_established_inbound_connection( &mut self, @@ -116,13 +116,14 @@ impl NetworkBehaviour for SpaceTime { debug!("sending establishment request to peer '{}'", peer_id); if other_established == 0 { self.pending_events.push_back(ToSwarm::GenerateEvent( - ManagerStreamAction::Event(Event::PeerConnected(ConnectedPeer { + Event::PeerConnected(ConnectedPeer { peer_id, establisher: match endpoint { ConnectedPoint::Dialer { .. } => true, ConnectedPoint::Listener { .. } => false, }, - })), + }) + .into(), )); } } @@ -136,7 +137,7 @@ impl NetworkBehaviour for SpaceTime { if remaining_established == 0 { debug!("Disconnected from peer '{}'", peer_id); self.pending_events.push_back(ToSwarm::GenerateEvent( - ManagerStreamAction::Event(Event::PeerDisconnected(peer_id)), + Event::PeerDisconnected(peer_id).into(), )); } } diff --git a/crates/p2p/src/spacetime/connection.rs b/crates/p2p/src/spacetime/connection.rs index 86a5fea82..ca4c2aca8 100644 --- a/crates/p2p/src/spacetime/connection.rs +++ b/crates/p2p/src/spacetime/connection.rs @@ -14,7 +14,7 @@ use std::{ }; use tracing::error; -use crate::{Manager, ManagerStreamAction, Metadata, PeerId}; +use crate::{Manager, ManagerStreamAction2, Metadata, PeerId}; use super::{InboundProtocol, OutboundProtocol, OutboundRequest, EMPTY_QUEUE_SHRINK_THRESHOLD}; @@ -49,7 +49,7 @@ impl SpaceTimeConnection { impl ConnectionHandler for SpaceTimeConnection { type InEvent = OutboundRequest; - type OutEvent = ManagerStreamAction; + type OutEvent = ManagerStreamAction2; type Error = ConnectionHandlerUpgrErr; type InboundProtocol = InboundProtocol; type OutboundProtocol = OutboundProtocol; diff --git a/crates/p2p/src/spacetime/proto_inbound.rs b/crates/p2p/src/spacetime/proto_inbound.rs index 8b29b002b..3722f4cf9 100644 --- a/crates/p2p/src/spacetime/proto_inbound.rs +++ b/crates/p2p/src/spacetime/proto_inbound.rs @@ -11,7 +11,7 @@ use tracing::debug; use crate::{ spacetime::{BroadcastStream, UnicastStream}, - Manager, ManagerStreamAction, Metadata, PeerId, PeerMessageEvent, + Manager, ManagerStreamAction2, Metadata, PeerId, PeerMessageEvent, }; use super::SpaceTimeProtocolName; @@ -31,7 +31,7 @@ impl UpgradeInfo for InboundProtocol { } impl InboundUpgrade for InboundProtocol { - type Output = ManagerStreamAction; + type Output = ManagerStreamAction2; type Error = (); type Future = Pin> + Send + 'static>>; @@ -48,7 +48,7 @@ impl InboundUpgrade for InboundProtoco match discriminator { crate::spacetime::BROADCAST_DISCRIMINATOR => { debug!("stream({}, {id}): broadcast stream accepted", self.peer_id); - Ok(ManagerStreamAction::Event( + Ok(ManagerStreamAction2::Event( PeerMessageEvent { stream_id: id, peer_id: self.peer_id, @@ -62,7 +62,7 @@ impl InboundUpgrade for InboundProtoco crate::spacetime::UNICAST_DISCRIMINATOR => { debug!("stream({}, {id}): unicast stream accepted", self.peer_id); - Ok(ManagerStreamAction::Event( + Ok(ManagerStreamAction2::Event( PeerMessageEvent { stream_id: id, peer_id: self.peer_id, diff --git a/crates/p2p/src/spacetunnel/identity.rs b/crates/p2p/src/spacetunnel/identity.rs index 4a0cca268..babe8fb45 100644 --- a/crates/p2p/src/spacetunnel/identity.rs +++ b/crates/p2p/src/spacetunnel/identity.rs @@ -2,6 +2,8 @@ use std::hash::{Hash, Hasher}; use ed25519_dalek::PublicKey; use rand_core::OsRng; +use serde::Serialize; +use specta::Type; use thiserror::Error; #[derive(Debug, Error)] @@ -58,6 +60,21 @@ impl std::fmt::Debug for RemoteIdentity { } } +impl Serialize for RemoteIdentity { + fn serialize(&self, serializer: S) -> Result { + serializer.serialize_str(&hex::encode(self.0.as_bytes())) + } +} + +impl Type for RemoteIdentity { + fn inline( + _: specta::DefOpts, + _: &[specta::DataType], + ) -> Result { + Ok(specta::DataType::Primitive(specta::PrimitiveType::String)) + } +} + impl RemoteIdentity { pub fn from_bytes(bytes: &[u8]) -> Result { Ok(Self(ed25519_dalek::PublicKey::from_bytes(bytes)?)) diff --git a/crates/p2p/src/utils/metadata.rs b/crates/p2p/src/utils/metadata.rs index 3373c805a..e6c348c41 100644 --- a/crates/p2p/src/utils/metadata.rs +++ b/crates/p2p/src/utils/metadata.rs @@ -1,10 +1,12 @@ -use std::collections::HashMap; +use std::{collections::HashMap, fmt::Debug}; + +use crate::PeerId; /// this trait must be implemented for the metadata type to allow it to be converted to MDNS DNS records. -pub trait Metadata: Clone + Send + Sync + 'static { +pub trait Metadata: Debug + Clone + Send + Sync + 'static { fn to_hashmap(self) -> HashMap; - fn from_hashmap(data: &HashMap) -> Result + fn from_hashmap(peer_id: &PeerId, data: &HashMap) -> Result where Self: Sized; } diff --git a/interface/app/$libraryId/settings/library/nodes.tsx b/interface/app/$libraryId/settings/library/nodes.tsx index 74da6a66a..9636d85d2 100644 --- a/interface/app/$libraryId/settings/library/nodes.tsx +++ b/interface/app/$libraryId/settings/library/nodes.tsx @@ -1,4 +1,11 @@ -import { useBridgeMutation, useDiscoveredPeers, useFeatureFlag } from '@sd/client'; +import { + isEnabled, + useBridgeMutation, + useBridgeQuery, + useConnectedPeers, + useDiscoveredPeers, + useFeatureFlag +} from '@sd/client'; import { Button } from '@sd/ui'; import { startPairing } from '~/app/p2p/pairing'; import { Heading } from '../Layout'; @@ -23,34 +30,69 @@ export const Component = () => { // TODO: This entire component shows a UI which is pairing by node but that is just not how it works. function IncorrectP2PPairingPane() { const onlineNodes = useDiscoveredPeers(); + const connectedNodes = useConnectedPeers(); const p2pPair = useBridgeMutation('p2p.pair', { onSuccess(data) { console.log(data); } }); + const nlmState = useBridgeQuery(['p2p.nlmState'], { + refetchInterval: 1000 + }); + const libraries = useBridgeQuery(['library.list']); return ( <> -

Pairing

- {[...onlineNodes.entries()].map(([id, node]) => ( -
-

{node.name}

+
+
+

Pairing

+ {[...onlineNodes.entries()].map(([id, node]) => ( +
+

{node.name}

- + +
+ ))}
- ))} +
+

Connected

+ {[...connectedNodes.entries()].map(([id, node]) => ( +
+

{id}

+
+ ))} +
+
+
+

NLM State:

+
{JSON.stringify(nlmState.data || {}, undefined, 2)}
+
+
+

Libraries:

+ {libraries.data?.map((v) => ( +
+

+ {v.config.name} - {v.uuid} +

+
+

Instance: {`${v.config.instance_id}/${v.instance_id}`}

+

Instance PK: {`${v.instance_public_key}`}

+
+
+ ))} +
); } diff --git a/packages/client/src/core.ts b/packages/client/src/core.ts index 1ea550660..cc8921943 100644 --- a/packages/client/src/core.ts +++ b/packages/client/src/core.ts @@ -24,6 +24,7 @@ export type Procedures = { { key: "notifications.dismiss", input: NotificationId, result: null } | { key: "notifications.dismissAll", input: never, result: null } | { key: "notifications.get", input: never, result: Notification[] } | + { key: "p2p.nlmState", input: never, result: { [key: string]: LibraryData } } | { key: "preferences.get", input: LibraryArgs, result: LibraryPreferences } | { key: "search.ephemeralPaths", input: LibraryArgs, result: NonIndexedFileSystemEntries } | { key: "search.objects", input: LibraryArgs, result: SearchData } | @@ -178,6 +179,8 @@ export type IndexerRule = { id: number; pub_id: number[]; name: string | null; d */ export type IndexerRuleCreateArgs = { name: string; dry_run: boolean; rules: ([RuleKind, string[]])[] } +export type InstanceState = "Unavailable" | { Discovered: PeerId } | { Connected: PeerId } + export type InvalidateOperationEvent = { key: string; arg: any; result: any | null } export type JobGroup = { id: string; action: string | null; status: JobStatus; created_at: string; jobs: JobReport[] } @@ -198,7 +201,9 @@ export type LibraryArgs = { library_id: string; arg: T } */ export type LibraryConfig = { name: LibraryName; description: string | null; instance_id: number } -export type LibraryConfigWrapped = { uuid: string; config: LibraryConfig } +export type LibraryConfigWrapped = { uuid: string; instance_id: string; instance_public_key: string; config: LibraryConfig } + +export type LibraryData = { instances: { [key: string]: InstanceState } } export type LibraryName = string @@ -285,7 +290,7 @@ export type OptionalRange = { from: T | null; to: T | null } /** * TODO: P2P event for the frontend */ -export type P2PEvent = { type: "DiscoveredPeer"; peer_id: PeerId; metadata: PeerMetadata } | { type: "SpacedropRequest"; id: string; peer_id: PeerId; name: string } | { type: "PairingRequest"; id: number; name: string; os: OperatingSystem } | { type: "PairingProgress"; id: number; status: PairingStatus } +export type P2PEvent = { type: "DiscoveredPeer"; peer_id: PeerId; metadata: PeerMetadata } | { type: "ExpiredPeer"; peer_id: PeerId } | { type: "ConnectedPeer"; peer_id: PeerId } | { type: "DisconnectedPeer"; peer_id: PeerId } | { type: "SpacedropRequest"; id: string; peer_id: PeerId; name: string } | { type: "PairingRequest"; id: number; name: string; os: OperatingSystem } | { type: "PairingProgress"; id: number; status: PairingStatus } export type PairingDecision = { decision: "accept"; libraryId: string } | { decision: "reject" } diff --git a/packages/client/src/hooks/useP2PEvents.tsx b/packages/client/src/hooks/useP2PEvents.tsx index 9be18c382..4a80cdded 100644 --- a/packages/client/src/hooks/useP2PEvents.tsx +++ b/packages/client/src/hooks/useP2PEvents.tsx @@ -12,6 +12,7 @@ import { useBridgeSubscription } from '../rspc'; type Context = { discoveredPeers: Map; + connectedPeers: Map; pairingStatus: Map; events: MutableRefObject; }; @@ -21,6 +22,7 @@ const Context = createContext(null as any); export function P2PContextProvider({ children }: PropsWithChildren) { const events = useRef(new EventTarget()); const [[discoveredPeers], setDiscoveredPeer] = useState([new Map()]); + const [[connectedPeers], setConnectedPeers] = useState([new Map()]); const [[pairingStatus], setPairingStatus] = useState([new Map()]); useBridgeSubscription(['p2p.events'], { @@ -28,7 +30,17 @@ export function P2PContextProvider({ children }: PropsWithChildren) { events.current.dispatchEvent(new CustomEvent('p2p-event', { detail: data })); if (data.type === 'DiscoveredPeer') { - setDiscoveredPeer([discoveredPeers.set(data.peer_id, data.metadata)]); + discoveredPeers.set(data.peer_id, data.metadata); + setDiscoveredPeer([discoveredPeers]); + } else if (data.type === 'ExpiredPeer') { + discoveredPeers.delete(data.peer_id); + setDiscoveredPeer([discoveredPeers]); + } else if (data.type === 'ConnectedPeer') { + connectedPeers.set(data.peer_id, undefined); + setConnectedPeers([connectedPeers]); + } else if (data.type === 'DisconnectedPeer') { + connectedPeers.delete(data.peer_id); + setConnectedPeers([connectedPeers]); } else if (data.type === 'PairingProgress') { setPairingStatus([pairingStatus.set(data.id, data.status)]); } @@ -39,6 +51,7 @@ export function P2PContextProvider({ children }: PropsWithChildren) {