diff --git a/apps/mobile/src/screens/settings/library/NodesSettings.tsx b/apps/mobile/src/screens/settings/library/NodesSettings.tsx index 0e8602907..bfb468fbb 100644 --- a/apps/mobile/src/screens/settings/library/NodesSettings.tsx +++ b/apps/mobile/src/screens/settings/library/NodesSettings.tsx @@ -14,7 +14,7 @@ const NodesSettingsScreen = ({ navigation }: SettingsStackScreenProps<'NodesSett {[...onlineNodes.entries()].map(([id, node]) => ( - {node.name} + {node.metadata.name} ))} diff --git a/core/src/api/p2p.rs b/core/src/api/p2p.rs index 6706ddeda..06ee83c3a 100644 --- a/core/src/api/p2p.rs +++ b/core/src/api/p2p.rs @@ -1,6 +1,6 @@ -use crate::p2p::{operations, Header, P2PEvent, PeerMetadata}; +use crate::p2p::{operations, ConnectionMethod, DiscoveryMethod, Header, P2PEvent, PeerMetadata}; -use sd_p2p2::RemoteIdentity; +use sd_p2p2::{PeerConnectionCandidate, RemoteIdentity}; use rspc::{alpha::AlphaRouter, ErrorCode}; use serde::Deserialize; @@ -19,17 +19,29 @@ pub(crate) fn mount() -> AlphaRouter { let mut queued = Vec::new(); - for (identity, peer, metadata) in - node.p2p.p2p.peers().iter().filter_map(|(i, p)| { - PeerMetadata::from_hashmap(&p.metadata()) - .ok() - .map(|m| (i, p, m)) - }) { - let identity = *identity; - match peer.is_connected() { - true => queued.push(P2PEvent::ConnectedPeer { identity }), - false => queued.push(P2PEvent::DiscoveredPeer { identity, metadata }), - } + for (_, peer, metadata) in node.p2p.p2p.peers().iter().filter_map(|(i, p)| { + PeerMetadata::from_hashmap(&p.metadata()) + .ok() + .map(|m| (i, p, m)) + }) { + queued.push(P2PEvent::PeerChange { + identity: peer.identity(), + connection: if peer.is_connected_with_hook(node.p2p.libraries_hook_id) { + ConnectionMethod::Relay + } else if peer.is_connected() { + ConnectionMethod::Local + } else { + ConnectionMethod::Disconnected + }, + discovery: match peer + .connection_candidates() + .contains(&PeerConnectionCandidate::Relay) + { + true => DiscoveryMethod::Relay, + false => DiscoveryMethod::Local, + }, + metadata, + }); } Ok(async_stream::stream! { diff --git a/core/src/p2p/connect_hook.rs b/core/src/p2p/connect_hook.rs deleted file mode 100644 index f52812841..000000000 --- a/core/src/p2p/connect_hook.rs +++ /dev/null @@ -1,26 +0,0 @@ -// TODO: This is unused but will be used in the future. -// use std::sync::Arc; - -// use sd_p2p2::{flume::bounded, HookEvent, P2P}; - -// /// A P2P hook which listens for the availability of peers and connects with them. -// pub struct ConnectHook {} - -// impl ConnectHook { -// pub fn spawn(p2p: Arc) -> Self { -// let (tx, rx) = bounded(15); -// let _ = p2p.register_hook("sd-connect-hook", tx); - -// tokio::spawn(async move { -// while let Ok(event) = rx.recv_async().await { -// match event { -// // TODO: Do the thing. For now we don't need this. -// HookEvent::Shutdown { _guard } => break, -// _ => continue, -// } -// } -// }); - -// Self {} -// } -// } diff --git a/core/src/p2p/events.rs b/core/src/p2p/events.rs index 1ed4c3ed8..ebcda606b 100644 --- a/core/src/p2p/events.rs +++ b/core/src/p2p/events.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use sd_p2p2::{flume::bounded, HookEvent, RemoteIdentity, P2P}; +use sd_p2p2::{flume::bounded, HookEvent, HookId, PeerConnectionCandidate, RemoteIdentity, P2P}; use serde::Serialize; use specta::Type; use tokio::sync::broadcast; @@ -8,21 +8,41 @@ use uuid::Uuid; use super::PeerMetadata; -/// TODO: P2P event for the frontend +/// The method used for the connection with this peer. +/// *Technically* you can have multiple under the hood but this simplifies things for the UX. +#[derive(Debug, Clone, Serialize, Type)] +pub enum ConnectionMethod { + // Connected via the SD Relay + Relay, + // Connected directly via an IP address + Local, + // Not connected + Disconnected, +} + +/// The method used for the discovery of this peer. +/// *Technically* you can have multiple under the hood but this simplifies things for the UX. +#[derive(Debug, Clone, Serialize, Type)] +pub enum DiscoveryMethod { + // Found via the SD Relay + Relay, + // Found via mDNS or a manual IP + Local, +} + +// This is used for synchronizing events between the backend and the frontend. #[derive(Debug, Clone, Serialize, Type)] #[serde(tag = "type")] pub enum P2PEvent { - DiscoveredPeer { + // An add or update event + PeerChange { identity: RemoteIdentity, + connection: ConnectionMethod, + discovery: DiscoveryMethod, metadata: PeerMetadata, }, - ExpiredPeer { - identity: RemoteIdentity, - }, - ConnectedPeer { - identity: RemoteIdentity, - }, - DisconnectedPeer { + // Delete a peer + PeerDelete { identity: RemoteIdentity, }, SpacedropRequest { @@ -49,7 +69,7 @@ pub struct P2PEvents { } impl P2PEvents { - pub fn spawn(p2p: Arc) -> Self { + pub fn spawn(p2p: Arc, libraries_hook_id: HookId) -> Self { let events = broadcast::channel(15); let (tx, rx) = bounded(15); let _ = p2p.register_hook("sd-frontend-events", tx); @@ -60,7 +80,7 @@ impl P2PEvents { let event = match event { // We use `HookEvent::PeerUnavailable`/`HookEvent::PeerAvailable` over `HookEvent::PeerExpiredBy`/`HookEvent::PeerDiscoveredBy` so that having an active connection is treated as "discovered". // It's possible to have an active connection without mDNS data (which is what Peer*By` are for) - HookEvent::PeerAvailable(peer) => { + HookEvent::PeerConnectedWith(_, peer) | HookEvent::PeerAvailable(peer) => { let metadata = match PeerMetadata::from_hashmap(&peer.metadata()) { Ok(metadata) => metadata, Err(e) => { @@ -73,15 +93,26 @@ impl P2PEvents { } }; - P2PEvent::DiscoveredPeer { + P2PEvent::PeerChange { identity: peer.identity(), + connection: if peer.is_connected_with_hook(libraries_hook_id) { + ConnectionMethod::Relay + } else if peer.is_connected() { + ConnectionMethod::Local + } else { + ConnectionMethod::Disconnected + }, + discovery: match peer + .connection_candidates() + .contains(&PeerConnectionCandidate::Relay) + { + true => DiscoveryMethod::Relay, + false => DiscoveryMethod::Local, + }, metadata, } } - HookEvent::PeerUnavailable(identity) => P2PEvent::ExpiredPeer { identity }, - HookEvent::PeerConnectedWith(_, peer) => P2PEvent::ConnectedPeer { - identity: peer.identity(), - }, + HookEvent::PeerUnavailable(identity) => P2PEvent::PeerDelete { identity }, HookEvent::PeerDisconnectedWith(_, identity) => { let peers = p2p.peers(); let Some(peer) = peers.get(&identity) else { @@ -89,7 +120,7 @@ impl P2PEvents { }; if !peer.is_connected() { - P2PEvent::DisconnectedPeer { identity } + P2PEvent::PeerDelete { identity } } else { continue; } diff --git a/core/src/p2p/libraries.rs b/core/src/p2p/libraries.rs index 616dc9541..1951cde40 100644 --- a/core/src/p2p/libraries.rs +++ b/core/src/p2p/libraries.rs @@ -1,6 +1,8 @@ use std::{collections::HashMap, sync::Arc}; -use sd_p2p2::{flume::bounded, HookEvent, IdentityOrRemoteIdentity, PeerConnectionCandidate, P2P}; +use sd_p2p2::{ + flume::bounded, HookEvent, HookId, IdentityOrRemoteIdentity, PeerConnectionCandidate, P2P, +}; use tracing::error; use crate::library::{Libraries, LibraryManagerEvent}; @@ -10,94 +12,93 @@ use crate::library::{Libraries, LibraryManagerEvent}; /// This hooks is responsible for: /// - injecting library peers into the P2P system so we can connect to them over internet. /// -pub struct LibrariesHook {} +pub fn libraries_hook(p2p: Arc, libraries: Arc) -> HookId { + let (tx, rx) = bounded(15); + let hook_id = p2p.register_hook("sd-libraries-hook", tx); -impl LibrariesHook { - pub fn spawn(p2p: Arc, libraries: Arc) -> Self { - let (tx, rx) = bounded(15); - let hook_id = p2p.register_hook("sd-libraries-hook", tx); + let handle = tokio::spawn(async move { + if let Err(err) = libraries + .rx + .clone() + .subscribe(|msg| { + let p2p = p2p.clone(); + async move { + match msg { + LibraryManagerEvent::InstancesModified(library) + | LibraryManagerEvent::Load(library) => { + p2p.metadata_mut().insert( + library.id.to_string(), + library.identity.to_remote_identity().to_string(), + ); - let handle = tokio::spawn(async move { - if let Err(err) = libraries - .rx - .clone() - .subscribe(|msg| { - let p2p = p2p.clone(); - async move { - match msg { - LibraryManagerEvent::InstancesModified(library) - | LibraryManagerEvent::Load(library) => { - p2p.metadata_mut().insert( - library.id.to_string(), - library.identity.to_remote_identity().to_string(), + let Ok(instances) = + library.db.instance().find_many(vec![]).exec().await + else { + return; + }; + + for i in instances.iter() { + let identity = IdentityOrRemoteIdentity::from_bytes(&i.identity) + .expect("lol: invalid DB entry") + .remote_identity(); + + // Skip self + if identity == library.identity.to_remote_identity() { + continue; + } + + p2p.clone().discover_peer( + hook_id, + identity, + HashMap::new(), // TODO: We should probs cache this so we have something + [PeerConnectionCandidate::Relay].into_iter().collect(), ); - - let Ok(instances) = - library.db.instance().find_many(vec![]).exec().await - else { - return; - }; - - for i in instances.iter() { - let identity = - IdentityOrRemoteIdentity::from_bytes(&i.identity) - .expect("lol: invalid DB entry") - .remote_identity(); - - p2p.clone().discover_peer( - hook_id, - identity, - HashMap::new(), // TODO: We should probs cache this so we have something - [PeerConnectionCandidate::Relay].into_iter().collect(), - ); - } } - LibraryManagerEvent::Edit(_library) => { - // TODO: Send changes to all connected nodes or queue sending for when they are online! - } - LibraryManagerEvent::Delete(library) => { - p2p.metadata_mut().remove(&library.id.to_string()); + } + LibraryManagerEvent::Edit(_library) => { + // TODO: Send changes to all connected nodes or queue sending for when they are online! + } + LibraryManagerEvent::Delete(library) => { + p2p.metadata_mut().remove(&library.id.to_string()); - let Ok(instances) = - library.db.instance().find_many(vec![]).exec().await - else { - return; + let Ok(instances) = + library.db.instance().find_many(vec![]).exec().await + else { + return; + }; + + for i in instances.iter() { + let identity = IdentityOrRemoteIdentity::from_bytes(&i.identity) + .expect("lol: invalid DB entry") + .remote_identity(); + + let peers = p2p.peers(); + let Some(peer) = peers.get(&identity) else { + continue; }; - - for i in instances.iter() { - let identity = - IdentityOrRemoteIdentity::from_bytes(&i.identity) - .expect("lol: invalid DB entry") - .remote_identity(); - - let peers = p2p.peers(); - let Some(peer) = peers.get(&identity) else { - continue; - }; - peer.undiscover_peer(hook_id); - } + peer.undiscover_peer(hook_id); } } } - }) - .await - { - error!("Core may become unstable! `LibraryServices::start` manager aborted with error: {err:?}"); - } - }); - - tokio::spawn(async move { - while let Ok(event) = rx.recv_async().await { - match event { - HookEvent::Shutdown { _guard } => { - handle.abort(); - break; - } - _ => continue, } - } - }); + }) + .await + { + error!("Core may become unstable! `LibraryServices::start` manager aborted with error: {err:?}"); + } + }); - Self {} - } + tokio::spawn(async move { + while let Ok(event) = rx.recv_async().await { + match event { + HookEvent::Shutdown { _guard } => { + handle.abort(); + break; + } + _ => continue, + } + } + }); + + hook_id } diff --git a/core/src/p2p/manager.rs b/core/src/p2p/manager.rs index 4b0a0ceed..ee01a013a 100644 --- a/core/src/p2p/manager.rs +++ b/core/src/p2p/manager.rs @@ -3,7 +3,10 @@ use crate::{ config::{self, P2PDiscoveryState, Port}, get_hardware_model_name, HardwareModel, }, - p2p::{operations, sync::SyncMessage, Header, OperatingSystem, SPACEDRIVE_APP_ID}, + p2p::{ + libraries::libraries_hook, operations, sync::SyncMessage, Header, OperatingSystem, + SPACEDRIVE_APP_ID, + }, Node, }; @@ -11,7 +14,7 @@ use axum::routing::IntoMakeService; use sd_p2p2::{ flume::{bounded, Receiver}, - Libp2pPeerId, Listener, Mdns, Peer, QuicTransport, RelayServerEntry, RemoteIdentity, + HookId, Libp2pPeerId, Listener, Mdns, Peer, QuicTransport, RelayServerEntry, RemoteIdentity, UnicastStream, P2P, }; use sd_p2p_tunnel::Tunnel; @@ -32,7 +35,7 @@ use tokio::sync::oneshot; use tracing::info; use uuid::Uuid; -use super::{libraries::LibrariesHook, P2PEvents, PeerMetadata}; +use super::{P2PEvents, PeerMetadata}; pub struct P2PManager { pub(crate) p2p: Arc, @@ -45,6 +48,7 @@ pub struct P2PManager { pub(super) spacedrop_pairing_reqs: Arc>>>>, pub(super) spacedrop_cancellations: Arc>>>, pub(crate) node_config: Arc, + pub libraries_hook_id: HookId, } impl P2PManager { @@ -61,20 +65,20 @@ impl P2PManager { let (tx, rx) = bounded(25); let p2p = P2P::new(SPACEDRIVE_APP_ID, node_config.get().await.identity, tx); let (quic, lp2p_peer_id) = QuicTransport::spawn(p2p.clone())?; + let libraries_hook_id = libraries_hook(p2p.clone(), libraries); let this = Arc::new(Self { p2p: p2p.clone(), lp2p_peer_id, mdns: Mutex::new(None), quic, - events: P2PEvents::spawn(p2p.clone()), + events: P2PEvents::spawn(p2p.clone(), libraries_hook_id), spacedrop_pairing_reqs: Default::default(), spacedrop_cancellations: Default::default(), node_config, + libraries_hook_id, }); this.on_node_config_change().await; - LibrariesHook::spawn(this.p2p.clone(), libraries); - info!( "Node RemoteIdentity('{}') libp2p::PeerId('{:?}') is now online listening at addresses: {:?}", this.p2p.remote_identity(), @@ -104,7 +108,7 @@ impl P2PManager { } else { match resp.json::>().await { Ok(config) => { - this.quic.relay_config(config).await; + this.quic.set_relay_config(config).await; info!("Updated p2p relay configuration successfully.") } Err(err) => { @@ -252,8 +256,8 @@ impl P2PManager { "p2p_ipv4_port": node_config.p2p_ipv4_port, "p2p_ipv6_port": node_config.p2p_ipv6_port, "p2p_discovery": node_config.p2p_discovery, - }) - + }), + "relay_config": self.quic.get_relay_config(), }) } diff --git a/core/src/p2p/mod.rs b/core/src/p2p/mod.rs index c958fae5c..5cff15717 100644 --- a/core/src/p2p/mod.rs +++ b/core/src/p2p/mod.rs @@ -1,7 +1,6 @@ #![warn(clippy::all, clippy::unwrap_used, clippy::panic)] #![allow(clippy::unnecessary_cast)] // Yeah they aren't necessary on this arch, but they are on others -mod connect_hook; mod events; pub(super) mod libraries; mod manager; @@ -10,7 +9,6 @@ pub mod operations; mod protocol; pub mod sync; -// pub use connect_hook::*; pub use events::*; pub use manager::*; pub use metadata::*; diff --git a/crates/p2p2/src/peer.rs b/crates/p2p2/src/peer.rs index bde8a3ef6..f555f5cf4 100644 --- a/crates/p2p2/src/peer.rs +++ b/crates/p2p2/src/peer.rs @@ -24,7 +24,6 @@ pub struct Peer { } // The order of this enum is the preference of the connection type. - #[derive(Debug, Clone, Hash, Eq, PartialEq, PartialOrd, Ord)] pub enum PeerConnectionCandidate { SocketAddr(SocketAddr), @@ -107,6 +106,25 @@ impl Peer { .is_empty() } + pub fn can_connect_with(&self, hook_id: HookId) -> bool { + self.state + .read() + .unwrap_or_else(PoisonError::into_inner) + .discovered + .contains_key(&hook_id) + } + + pub fn connection_candidates(&self) -> BTreeSet { + self.state + .read() + .unwrap_or_else(PoisonError::into_inner) + .discovered + .values() + .cloned() + .flatten() + .collect() + } + pub fn is_connected(&self) -> bool { !self .state @@ -124,6 +142,23 @@ impl Peer { .len() } + // TODO: Possibly remove this, it's not great??? + pub fn is_connected_with_hook(&self, hook_id: HookId) -> bool { + self.state + .read() + .unwrap_or_else(PoisonError::into_inner) + .active_connections + .contains_key(&ListenerId(hook_id.0)) + } + + pub fn is_connected_with(&self, listener_id: ListenerId) -> bool { + self.state + .read() + .unwrap_or_else(PoisonError::into_inner) + .active_connections + .contains_key(&listener_id) + } + pub fn connection_methods(&self) -> HashSet { self.state .read() diff --git a/crates/p2p2/src/quic/transport.rs b/crates/p2p2/src/quic/transport.rs index db82ce06e..39483562b 100644 --- a/crates/p2p2/src/quic/transport.rs +++ b/crates/p2p2/src/quic/transport.rs @@ -2,7 +2,7 @@ use std::{ collections::HashMap, net::{Ipv4Addr, Ipv6Addr, SocketAddr}, str::FromStr, - sync::{Arc, PoisonError, RwLock}, + sync::{Arc, Mutex, PoisonError, RwLock}, time::Duration, }; @@ -60,7 +60,7 @@ enum InternalEvent { }, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct RelayServerEntry { id: Uuid, peer_id: String, @@ -85,6 +85,7 @@ pub struct QuicTransport { id: ListenerId, p2p: Arc, internal_tx: Sender, + relay_config: Mutex>, } impl QuicTransport { @@ -125,6 +126,7 @@ impl QuicTransport { id, p2p, internal_tx, + relay_config: Mutex::new(Vec::new()), }, libp2p_peer_id, )) @@ -132,19 +134,34 @@ impl QuicTransport { /// Configure the relay servers to use. /// This method will replace any existing relay servers. - pub async fn relay_config(&self, relays: Vec) { + pub async fn set_relay_config(&self, relays: Vec) { let (tx, rx) = oneshot::channel(); - let event = InternalEvent::RegisterRelays { relays, result: tx }; + let event = InternalEvent::RegisterRelays { + relays: relays.clone(), + result: tx, + }; let Ok(_) = self.internal_tx.send(event) else { return; }; match rx.await.unwrap_or_else(|_| Ok(())) { - Ok(_) => {} + Ok(_) => { + *self + .relay_config + .lock() + .unwrap_or_else(PoisonError::into_inner) = relays; + } Err(e) => error!("Failed to register relay config as the event loop has died: {e}"), } } + pub fn get_relay_config(&self) -> Vec { + self.relay_config + .lock() + .unwrap_or_else(PoisonError::into_inner) + .clone() + } + // `None` on the port means disabled. Use `0` for random port. pub async fn set_ipv4_enabled(&self, port: Option) -> Result<(), String> { self.setup_listener( diff --git a/interface/app/$libraryId/Explorer/ContextMenu/SharedItems.tsx b/interface/app/$libraryId/Explorer/ContextMenu/SharedItems.tsx index 230610e3f..f8115633f 100644 --- a/interface/app/$libraryId/Explorer/ContextMenu/SharedItems.tsx +++ b/interface/app/$libraryId/Explorer/ContextMenu/SharedItems.tsx @@ -223,7 +223,7 @@ const SpacedropNodes = () => { return Array.from(discoveredPeers).map(([id, peer]) => ( { spacedrop.mutateAsync({ diff --git a/interface/app/$libraryId/Spacedrop/index.tsx b/interface/app/$libraryId/Spacedrop/index.tsx index 05a78811e..a11d68061 100644 --- a/interface/app/$libraryId/Spacedrop/index.tsx +++ b/interface/app/$libraryId/Spacedrop/index.tsx @@ -128,7 +128,8 @@ export function Spacedrop({ triggerClose }: { triggerClose: () => void }) { ))} @@ -142,10 +143,12 @@ export function Spacedrop({ triggerClose }: { triggerClose: () => void }) { function Node({ id, name, + model, onDropped }: { id: string; - name: HardwareModel; + name: string; + model: HardwareModel; onDropped: (id: string, files: string[]) => void; }) { const ref = useRef(null); @@ -178,7 +181,7 @@ function Node({ }); }} > - +

{name}

); diff --git a/interface/app/$libraryId/network.tsx b/interface/app/$libraryId/network.tsx index 984c1c39d..b5bff5572 100644 --- a/interface/app/$libraryId/network.tsx +++ b/interface/app/$libraryId/network.tsx @@ -39,7 +39,7 @@ export const Component = () => { has_local_thumbnail: false, thumbnail: null, item: { - ...peer, + ...peer.metadata, pub_id: [] } })), diff --git a/packages/client/src/core.ts b/packages/client/src/core.ts index ef1e185b1..a7976587c 100644 --- a/packages/client/src/core.ts +++ b/packages/client/src/core.ts @@ -190,6 +190,12 @@ export type Composite = */ "Live" +/** + * The method used for the connection with this peer. + * *Technically* you can have multiple under the hood but this simplifies things for the UX. + */ +export type ConnectionMethod = "Relay" | "Local" | "Disconnected" + export type ConvertImageArgs = { location_id: number; file_path_id: number; delete_src: boolean; desired_extension: ConvertibleExtension; quality_percentage: number | null } export type ConvertibleExtension = "bmp" | "dib" | "ff" | "gif" | "ico" | "jpg" | "jpeg" | "png" | "pnm" | "qoi" | "tga" | "icb" | "vda" | "vst" | "tiff" | "tif" | "hif" | "heif" | "heifs" | "heic" | "heics" | "avif" | "avci" | "avcs" | "svg" | "svgz" | "pdf" | "webp" @@ -204,6 +210,12 @@ export type CursorOrderItem = { order: SortOrder; data: T } export type DefaultLocations = { desktop: boolean; documents: boolean; downloads: boolean; pictures: boolean; music: boolean; videos: boolean } +/** + * The method used for the discovery of this peer. + * *Technically* you can have multiple under the hood but this simplifies things for the UX. + */ +export type DiscoveryMethod = "Relay" | "Local" + export type DiskType = "SSD" | "HDD" | "Removable" export type DoubleClickAction = "openFile" | "quickPreview" @@ -511,10 +523,7 @@ export type Orientation = "Normal" | "CW90" | "CW180" | "CW270" | "MirroredVerti export type P2PDiscoveryState = "Everyone" | "ContactsOnly" | "Disabled" -/** - * TODO: P2P event for the frontend - */ -export type P2PEvent = { type: "DiscoveredPeer"; identity: RemoteIdentity; metadata: PeerMetadata } | { type: "ExpiredPeer"; identity: RemoteIdentity } | { type: "ConnectedPeer"; identity: RemoteIdentity } | { type: "DisconnectedPeer"; identity: RemoteIdentity } | { type: "SpacedropRequest"; id: string; identity: RemoteIdentity; peer_name: string; files: string[] } | { type: "SpacedropProgress"; id: string; percent: number } | { type: "SpacedropTimedOut"; id: string } | { type: "SpacedropRejected"; id: string } +export type P2PEvent = { type: "PeerChange"; identity: RemoteIdentity; connection: ConnectionMethod; discovery: DiscoveryMethod; metadata: PeerMetadata } | { type: "PeerDelete"; identity: RemoteIdentity } | { type: "SpacedropRequest"; id: string; identity: RemoteIdentity; peer_name: string; files: string[] } | { type: "SpacedropProgress"; id: string; percent: number } | { type: "SpacedropTimedOut"; id: string } | { type: "SpacedropRejected"; id: string } export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; device_model: HardwareModel | null; version: string | null } diff --git a/packages/client/src/hooks/useP2PEvents.tsx b/packages/client/src/hooks/useP2PEvents.tsx index 6987e4644..299823282 100644 --- a/packages/client/src/hooks/useP2PEvents.tsx +++ b/packages/client/src/hooks/useP2PEvents.tsx @@ -8,12 +8,17 @@ import { useState } from 'react'; -import { P2PEvent, PeerMetadata } from '../core'; +import { ConnectionMethod, DiscoveryMethod, P2PEvent, PeerMetadata } from '../core'; import { useBridgeSubscription } from '../rspc'; +type Peer = { + connection: ConnectionMethod; + discovery: DiscoveryMethod; + metadata: PeerMetadata; +}; + type Context = { - discoveredPeers: Map; - connectedPeers: Map; + peers: Map; spacedropProgresses: Map; events: MutableRefObject; }; @@ -22,26 +27,23 @@ const Context = createContext(null as any); export function P2PContextProvider({ children }: PropsWithChildren) { const events = useRef(new EventTarget()); - const [[discoveredPeers], setDiscoveredPeer] = useState([new Map()]); - const [[connectedPeers], setConnectedPeers] = useState([new Map()]); + const [[peers], setPeers] = useState([new Map()]); const [[spacedropProgresses], setSpacedropProgresses] = useState([new Map()]); useBridgeSubscription(['p2p.events'], { onData(data) { events.current.dispatchEvent(new CustomEvent('p2p-event', { detail: data })); - if (data.type === 'DiscoveredPeer') { - discoveredPeers.set(data.identity, data.metadata); - setDiscoveredPeer([discoveredPeers]); - } else if (data.type === 'ExpiredPeer') { - discoveredPeers.delete(data.identity); - setDiscoveredPeer([discoveredPeers]); - } else if (data.type === 'ConnectedPeer') { - connectedPeers.set(data.identity, undefined); - setConnectedPeers([connectedPeers]); - } else if (data.type === 'DisconnectedPeer') { - connectedPeers.delete(data.identity); - setConnectedPeers([connectedPeers]); + if (data.type === 'PeerChange') { + peers.set(data.identity, { + connection: data.connection, + discovery: data.discovery, + metadata: data.metadata + }); + setPeers([peers]); + } else if (data.type === 'PeerDelete') { + peers.delete(data.identity); + setPeers([peers]); } else if (data.type === 'SpacedropProgress') { spacedropProgresses.set(data.id, data.percent); setSpacedropProgresses([spacedropProgresses]); @@ -52,8 +54,7 @@ export function P2PContextProvider({ children }: PropsWithChildren) { return ( peer.connection === 'Disconnected')); } export function useConnectedPeers() { - return useContext(Context).connectedPeers; + return new Map([...usePeers()].filter(([, peer]) => peer.connection !== 'Disconnected')); } export function useSpacedropProgress(id: string) {