Refactor P2P Hooks (#2193)

* Basic AF server

* wip

* Add autonat to relay server

* Add autonat client + fixes

* Deploy script

* wip

* Debug view

* wip

* wip

* relay all events

* wip

* fix

* wip

* libp2p man spoke

* dctur

* Relay config file

* Advertise relay server

* Dynamic relay configuration

* wip

* p2p relay config

* cleanup

* push instances into p2p state

* fix

* Fix up TS

* refactor p2p hooks

* fix backend state

* Skip self

* a

* b

* Relay config in debug query

* Fix method name

* tsc is broken on my machine

* a

* Incorrect typecasts. Can we just ban them

* fix types
This commit is contained in:
Oscar Beaumont 2024-03-13 14:43:22 +08:00 committed by GitHub
parent 79530f1e4e
commit 2ca88a5d6b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 274 additions and 185 deletions

View file

@ -14,7 +14,7 @@ const NodesSettingsScreen = ({ navigation }: SettingsStackScreenProps<'NodesSett
{[...onlineNodes.entries()].map(([id, node]) => ( {[...onlineNodes.entries()].map(([id, node]) => (
<View key={id} style={tw`flex`}> <View key={id} style={tw`flex`}>
<Text style={tw`text-ink`}>{node.name}</Text> <Text style={tw`text-ink`}>{node.metadata.name}</Text>
</View> </View>
))} ))}
</ScreenContainer> </ScreenContainer>

View file

@ -1,6 +1,6 @@
use crate::p2p::{operations, Header, P2PEvent, PeerMetadata}; use crate::p2p::{operations, ConnectionMethod, DiscoveryMethod, Header, P2PEvent, PeerMetadata};
use sd_p2p2::RemoteIdentity; use sd_p2p2::{PeerConnectionCandidate, RemoteIdentity};
use rspc::{alpha::AlphaRouter, ErrorCode}; use rspc::{alpha::AlphaRouter, ErrorCode};
use serde::Deserialize; use serde::Deserialize;
@ -19,17 +19,29 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
let mut queued = Vec::new(); let mut queued = Vec::new();
for (identity, peer, metadata) in for (_, peer, metadata) in node.p2p.p2p.peers().iter().filter_map(|(i, p)| {
node.p2p.p2p.peers().iter().filter_map(|(i, p)| { PeerMetadata::from_hashmap(&p.metadata())
PeerMetadata::from_hashmap(&p.metadata()) .ok()
.ok() .map(|m| (i, p, m))
.map(|m| (i, p, m)) }) {
}) { queued.push(P2PEvent::PeerChange {
let identity = *identity; identity: peer.identity(),
match peer.is_connected() { connection: if peer.is_connected_with_hook(node.p2p.libraries_hook_id) {
true => queued.push(P2PEvent::ConnectedPeer { identity }), ConnectionMethod::Relay
false => queued.push(P2PEvent::DiscoveredPeer { identity, metadata }), } else if peer.is_connected() {
} ConnectionMethod::Local
} else {
ConnectionMethod::Disconnected
},
discovery: match peer
.connection_candidates()
.contains(&PeerConnectionCandidate::Relay)
{
true => DiscoveryMethod::Relay,
false => DiscoveryMethod::Local,
},
metadata,
});
} }
Ok(async_stream::stream! { Ok(async_stream::stream! {

View file

@ -1,26 +0,0 @@
// TODO: This is unused but will be used in the future.
// use std::sync::Arc;
// use sd_p2p2::{flume::bounded, HookEvent, P2P};
// /// A P2P hook which listens for the availability of peers and connects with them.
// pub struct ConnectHook {}
// impl ConnectHook {
// pub fn spawn(p2p: Arc<P2P>) -> Self {
// let (tx, rx) = bounded(15);
// let _ = p2p.register_hook("sd-connect-hook", tx);
// tokio::spawn(async move {
// while let Ok(event) = rx.recv_async().await {
// match event {
// // TODO: Do the thing. For now we don't need this.
// HookEvent::Shutdown { _guard } => break,
// _ => continue,
// }
// }
// });
// Self {}
// }
// }

View file

@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use sd_p2p2::{flume::bounded, HookEvent, RemoteIdentity, P2P}; use sd_p2p2::{flume::bounded, HookEvent, HookId, PeerConnectionCandidate, RemoteIdentity, P2P};
use serde::Serialize; use serde::Serialize;
use specta::Type; use specta::Type;
use tokio::sync::broadcast; use tokio::sync::broadcast;
@ -8,21 +8,41 @@ use uuid::Uuid;
use super::PeerMetadata; use super::PeerMetadata;
/// TODO: P2P event for the frontend /// The method used for the connection with this peer.
/// *Technically* you can have multiple under the hood but this simplifies things for the UX.
#[derive(Debug, Clone, Serialize, Type)]
pub enum ConnectionMethod {
// Connected via the SD Relay
Relay,
// Connected directly via an IP address
Local,
// Not connected
Disconnected,
}
/// The method used for the discovery of this peer.
/// *Technically* you can have multiple under the hood but this simplifies things for the UX.
#[derive(Debug, Clone, Serialize, Type)]
pub enum DiscoveryMethod {
// Found via the SD Relay
Relay,
// Found via mDNS or a manual IP
Local,
}
// This is used for synchronizing events between the backend and the frontend.
#[derive(Debug, Clone, Serialize, Type)] #[derive(Debug, Clone, Serialize, Type)]
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum P2PEvent { pub enum P2PEvent {
DiscoveredPeer { // An add or update event
PeerChange {
identity: RemoteIdentity, identity: RemoteIdentity,
connection: ConnectionMethod,
discovery: DiscoveryMethod,
metadata: PeerMetadata, metadata: PeerMetadata,
}, },
ExpiredPeer { // Delete a peer
identity: RemoteIdentity, PeerDelete {
},
ConnectedPeer {
identity: RemoteIdentity,
},
DisconnectedPeer {
identity: RemoteIdentity, identity: RemoteIdentity,
}, },
SpacedropRequest { SpacedropRequest {
@ -49,7 +69,7 @@ pub struct P2PEvents {
} }
impl P2PEvents { impl P2PEvents {
pub fn spawn(p2p: Arc<P2P>) -> Self { pub fn spawn(p2p: Arc<P2P>, libraries_hook_id: HookId) -> Self {
let events = broadcast::channel(15); let events = broadcast::channel(15);
let (tx, rx) = bounded(15); let (tx, rx) = bounded(15);
let _ = p2p.register_hook("sd-frontend-events", tx); let _ = p2p.register_hook("sd-frontend-events", tx);
@ -60,7 +80,7 @@ impl P2PEvents {
let event = match event { let event = match event {
// We use `HookEvent::PeerUnavailable`/`HookEvent::PeerAvailable` over `HookEvent::PeerExpiredBy`/`HookEvent::PeerDiscoveredBy` so that having an active connection is treated as "discovered". // We use `HookEvent::PeerUnavailable`/`HookEvent::PeerAvailable` over `HookEvent::PeerExpiredBy`/`HookEvent::PeerDiscoveredBy` so that having an active connection is treated as "discovered".
// It's possible to have an active connection without mDNS data (which is what Peer*By` are for) // It's possible to have an active connection without mDNS data (which is what Peer*By` are for)
HookEvent::PeerAvailable(peer) => { HookEvent::PeerConnectedWith(_, peer) | HookEvent::PeerAvailable(peer) => {
let metadata = match PeerMetadata::from_hashmap(&peer.metadata()) { let metadata = match PeerMetadata::from_hashmap(&peer.metadata()) {
Ok(metadata) => metadata, Ok(metadata) => metadata,
Err(e) => { Err(e) => {
@ -73,15 +93,26 @@ impl P2PEvents {
} }
}; };
P2PEvent::DiscoveredPeer { P2PEvent::PeerChange {
identity: peer.identity(), identity: peer.identity(),
connection: if peer.is_connected_with_hook(libraries_hook_id) {
ConnectionMethod::Relay
} else if peer.is_connected() {
ConnectionMethod::Local
} else {
ConnectionMethod::Disconnected
},
discovery: match peer
.connection_candidates()
.contains(&PeerConnectionCandidate::Relay)
{
true => DiscoveryMethod::Relay,
false => DiscoveryMethod::Local,
},
metadata, metadata,
} }
} }
HookEvent::PeerUnavailable(identity) => P2PEvent::ExpiredPeer { identity }, HookEvent::PeerUnavailable(identity) => P2PEvent::PeerDelete { identity },
HookEvent::PeerConnectedWith(_, peer) => P2PEvent::ConnectedPeer {
identity: peer.identity(),
},
HookEvent::PeerDisconnectedWith(_, identity) => { HookEvent::PeerDisconnectedWith(_, identity) => {
let peers = p2p.peers(); let peers = p2p.peers();
let Some(peer) = peers.get(&identity) else { let Some(peer) = peers.get(&identity) else {
@ -89,7 +120,7 @@ impl P2PEvents {
}; };
if !peer.is_connected() { if !peer.is_connected() {
P2PEvent::DisconnectedPeer { identity } P2PEvent::PeerDelete { identity }
} else { } else {
continue; continue;
} }

View file

@ -1,6 +1,8 @@
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use sd_p2p2::{flume::bounded, HookEvent, IdentityOrRemoteIdentity, PeerConnectionCandidate, P2P}; use sd_p2p2::{
flume::bounded, HookEvent, HookId, IdentityOrRemoteIdentity, PeerConnectionCandidate, P2P,
};
use tracing::error; use tracing::error;
use crate::library::{Libraries, LibraryManagerEvent}; use crate::library::{Libraries, LibraryManagerEvent};
@ -10,94 +12,93 @@ use crate::library::{Libraries, LibraryManagerEvent};
/// This hooks is responsible for: /// This hooks is responsible for:
/// - injecting library peers into the P2P system so we can connect to them over internet. /// - injecting library peers into the P2P system so we can connect to them over internet.
/// ///
pub struct LibrariesHook {} pub fn libraries_hook(p2p: Arc<P2P>, libraries: Arc<Libraries>) -> HookId {
let (tx, rx) = bounded(15);
let hook_id = p2p.register_hook("sd-libraries-hook", tx);
impl LibrariesHook { let handle = tokio::spawn(async move {
pub fn spawn(p2p: Arc<P2P>, libraries: Arc<Libraries>) -> Self { if let Err(err) = libraries
let (tx, rx) = bounded(15); .rx
let hook_id = p2p.register_hook("sd-libraries-hook", tx); .clone()
.subscribe(|msg| {
let p2p = p2p.clone();
async move {
match msg {
LibraryManagerEvent::InstancesModified(library)
| LibraryManagerEvent::Load(library) => {
p2p.metadata_mut().insert(
library.id.to_string(),
library.identity.to_remote_identity().to_string(),
);
let handle = tokio::spawn(async move { let Ok(instances) =
if let Err(err) = libraries library.db.instance().find_many(vec![]).exec().await
.rx else {
.clone() return;
.subscribe(|msg| { };
let p2p = p2p.clone();
async move { for i in instances.iter() {
match msg { let identity = IdentityOrRemoteIdentity::from_bytes(&i.identity)
LibraryManagerEvent::InstancesModified(library) .expect("lol: invalid DB entry")
| LibraryManagerEvent::Load(library) => { .remote_identity();
p2p.metadata_mut().insert(
library.id.to_string(), // Skip self
library.identity.to_remote_identity().to_string(), if identity == library.identity.to_remote_identity() {
continue;
}
p2p.clone().discover_peer(
hook_id,
identity,
HashMap::new(), // TODO: We should probs cache this so we have something
[PeerConnectionCandidate::Relay].into_iter().collect(),
); );
let Ok(instances) =
library.db.instance().find_many(vec![]).exec().await
else {
return;
};
for i in instances.iter() {
let identity =
IdentityOrRemoteIdentity::from_bytes(&i.identity)
.expect("lol: invalid DB entry")
.remote_identity();
p2p.clone().discover_peer(
hook_id,
identity,
HashMap::new(), // TODO: We should probs cache this so we have something
[PeerConnectionCandidate::Relay].into_iter().collect(),
);
}
} }
LibraryManagerEvent::Edit(_library) => { }
// TODO: Send changes to all connected nodes or queue sending for when they are online! LibraryManagerEvent::Edit(_library) => {
} // TODO: Send changes to all connected nodes or queue sending for when they are online!
LibraryManagerEvent::Delete(library) => { }
p2p.metadata_mut().remove(&library.id.to_string()); LibraryManagerEvent::Delete(library) => {
p2p.metadata_mut().remove(&library.id.to_string());
let Ok(instances) = let Ok(instances) =
library.db.instance().find_many(vec![]).exec().await library.db.instance().find_many(vec![]).exec().await
else { else {
return; return;
};
for i in instances.iter() {
let identity = IdentityOrRemoteIdentity::from_bytes(&i.identity)
.expect("lol: invalid DB entry")
.remote_identity();
let peers = p2p.peers();
let Some(peer) = peers.get(&identity) else {
continue;
}; };
peer.undiscover_peer(hook_id);
for i in instances.iter() {
let identity =
IdentityOrRemoteIdentity::from_bytes(&i.identity)
.expect("lol: invalid DB entry")
.remote_identity();
let peers = p2p.peers();
let Some(peer) = peers.get(&identity) else {
continue;
};
peer.undiscover_peer(hook_id);
}
} }
} }
} }
})
.await
{
error!("Core may become unstable! `LibraryServices::start` manager aborted with error: {err:?}");
}
});
tokio::spawn(async move {
while let Ok(event) = rx.recv_async().await {
match event {
HookEvent::Shutdown { _guard } => {
handle.abort();
break;
}
_ => continue,
} }
} })
}); .await
{
error!("Core may become unstable! `LibraryServices::start` manager aborted with error: {err:?}");
}
});
Self {} tokio::spawn(async move {
} while let Ok(event) = rx.recv_async().await {
match event {
HookEvent::Shutdown { _guard } => {
handle.abort();
break;
}
_ => continue,
}
}
});
hook_id
} }

View file

@ -3,7 +3,10 @@ use crate::{
config::{self, P2PDiscoveryState, Port}, config::{self, P2PDiscoveryState, Port},
get_hardware_model_name, HardwareModel, get_hardware_model_name, HardwareModel,
}, },
p2p::{operations, sync::SyncMessage, Header, OperatingSystem, SPACEDRIVE_APP_ID}, p2p::{
libraries::libraries_hook, operations, sync::SyncMessage, Header, OperatingSystem,
SPACEDRIVE_APP_ID,
},
Node, Node,
}; };
@ -11,7 +14,7 @@ use axum::routing::IntoMakeService;
use sd_p2p2::{ use sd_p2p2::{
flume::{bounded, Receiver}, flume::{bounded, Receiver},
Libp2pPeerId, Listener, Mdns, Peer, QuicTransport, RelayServerEntry, RemoteIdentity, HookId, Libp2pPeerId, Listener, Mdns, Peer, QuicTransport, RelayServerEntry, RemoteIdentity,
UnicastStream, P2P, UnicastStream, P2P,
}; };
use sd_p2p_tunnel::Tunnel; use sd_p2p_tunnel::Tunnel;
@ -32,7 +35,7 @@ use tokio::sync::oneshot;
use tracing::info; use tracing::info;
use uuid::Uuid; use uuid::Uuid;
use super::{libraries::LibrariesHook, P2PEvents, PeerMetadata}; use super::{P2PEvents, PeerMetadata};
pub struct P2PManager { pub struct P2PManager {
pub(crate) p2p: Arc<P2P>, pub(crate) p2p: Arc<P2P>,
@ -45,6 +48,7 @@ pub struct P2PManager {
pub(super) spacedrop_pairing_reqs: Arc<Mutex<HashMap<Uuid, oneshot::Sender<Option<String>>>>>, pub(super) spacedrop_pairing_reqs: Arc<Mutex<HashMap<Uuid, oneshot::Sender<Option<String>>>>>,
pub(super) spacedrop_cancellations: Arc<Mutex<HashMap<Uuid, Arc<AtomicBool>>>>, pub(super) spacedrop_cancellations: Arc<Mutex<HashMap<Uuid, Arc<AtomicBool>>>>,
pub(crate) node_config: Arc<config::Manager>, pub(crate) node_config: Arc<config::Manager>,
pub libraries_hook_id: HookId,
} }
impl P2PManager { impl P2PManager {
@ -61,20 +65,20 @@ impl P2PManager {
let (tx, rx) = bounded(25); let (tx, rx) = bounded(25);
let p2p = P2P::new(SPACEDRIVE_APP_ID, node_config.get().await.identity, tx); let p2p = P2P::new(SPACEDRIVE_APP_ID, node_config.get().await.identity, tx);
let (quic, lp2p_peer_id) = QuicTransport::spawn(p2p.clone())?; let (quic, lp2p_peer_id) = QuicTransport::spawn(p2p.clone())?;
let libraries_hook_id = libraries_hook(p2p.clone(), libraries);
let this = Arc::new(Self { let this = Arc::new(Self {
p2p: p2p.clone(), p2p: p2p.clone(),
lp2p_peer_id, lp2p_peer_id,
mdns: Mutex::new(None), mdns: Mutex::new(None),
quic, quic,
events: P2PEvents::spawn(p2p.clone()), events: P2PEvents::spawn(p2p.clone(), libraries_hook_id),
spacedrop_pairing_reqs: Default::default(), spacedrop_pairing_reqs: Default::default(),
spacedrop_cancellations: Default::default(), spacedrop_cancellations: Default::default(),
node_config, node_config,
libraries_hook_id,
}); });
this.on_node_config_change().await; this.on_node_config_change().await;
LibrariesHook::spawn(this.p2p.clone(), libraries);
info!( info!(
"Node RemoteIdentity('{}') libp2p::PeerId('{:?}') is now online listening at addresses: {:?}", "Node RemoteIdentity('{}') libp2p::PeerId('{:?}') is now online listening at addresses: {:?}",
this.p2p.remote_identity(), this.p2p.remote_identity(),
@ -104,7 +108,7 @@ impl P2PManager {
} else { } else {
match resp.json::<Vec<RelayServerEntry>>().await { match resp.json::<Vec<RelayServerEntry>>().await {
Ok(config) => { Ok(config) => {
this.quic.relay_config(config).await; this.quic.set_relay_config(config).await;
info!("Updated p2p relay configuration successfully.") info!("Updated p2p relay configuration successfully.")
} }
Err(err) => { Err(err) => {
@ -252,8 +256,8 @@ impl P2PManager {
"p2p_ipv4_port": node_config.p2p_ipv4_port, "p2p_ipv4_port": node_config.p2p_ipv4_port,
"p2p_ipv6_port": node_config.p2p_ipv6_port, "p2p_ipv6_port": node_config.p2p_ipv6_port,
"p2p_discovery": node_config.p2p_discovery, "p2p_discovery": node_config.p2p_discovery,
}) }),
"relay_config": self.quic.get_relay_config(),
}) })
} }

View file

@ -1,7 +1,6 @@
#![warn(clippy::all, clippy::unwrap_used, clippy::panic)] #![warn(clippy::all, clippy::unwrap_used, clippy::panic)]
#![allow(clippy::unnecessary_cast)] // Yeah they aren't necessary on this arch, but they are on others #![allow(clippy::unnecessary_cast)] // Yeah they aren't necessary on this arch, but they are on others
mod connect_hook;
mod events; mod events;
pub(super) mod libraries; pub(super) mod libraries;
mod manager; mod manager;
@ -10,7 +9,6 @@ pub mod operations;
mod protocol; mod protocol;
pub mod sync; pub mod sync;
// pub use connect_hook::*;
pub use events::*; pub use events::*;
pub use manager::*; pub use manager::*;
pub use metadata::*; pub use metadata::*;

View file

@ -24,7 +24,6 @@ pub struct Peer {
} }
// The order of this enum is the preference of the connection type. // The order of this enum is the preference of the connection type.
#[derive(Debug, Clone, Hash, Eq, PartialEq, PartialOrd, Ord)] #[derive(Debug, Clone, Hash, Eq, PartialEq, PartialOrd, Ord)]
pub enum PeerConnectionCandidate { pub enum PeerConnectionCandidate {
SocketAddr(SocketAddr), SocketAddr(SocketAddr),
@ -107,6 +106,25 @@ impl Peer {
.is_empty() .is_empty()
} }
pub fn can_connect_with(&self, hook_id: HookId) -> bool {
self.state
.read()
.unwrap_or_else(PoisonError::into_inner)
.discovered
.contains_key(&hook_id)
}
pub fn connection_candidates(&self) -> BTreeSet<PeerConnectionCandidate> {
self.state
.read()
.unwrap_or_else(PoisonError::into_inner)
.discovered
.values()
.cloned()
.flatten()
.collect()
}
pub fn is_connected(&self) -> bool { pub fn is_connected(&self) -> bool {
!self !self
.state .state
@ -124,6 +142,23 @@ impl Peer {
.len() .len()
} }
// TODO: Possibly remove this, it's not great???
pub fn is_connected_with_hook(&self, hook_id: HookId) -> bool {
self.state
.read()
.unwrap_or_else(PoisonError::into_inner)
.active_connections
.contains_key(&ListenerId(hook_id.0))
}
pub fn is_connected_with(&self, listener_id: ListenerId) -> bool {
self.state
.read()
.unwrap_or_else(PoisonError::into_inner)
.active_connections
.contains_key(&listener_id)
}
pub fn connection_methods(&self) -> HashSet<ListenerId> { pub fn connection_methods(&self) -> HashSet<ListenerId> {
self.state self.state
.read() .read()

View file

@ -2,7 +2,7 @@ use std::{
collections::HashMap, collections::HashMap,
net::{Ipv4Addr, Ipv6Addr, SocketAddr}, net::{Ipv4Addr, Ipv6Addr, SocketAddr},
str::FromStr, str::FromStr,
sync::{Arc, PoisonError, RwLock}, sync::{Arc, Mutex, PoisonError, RwLock},
time::Duration, time::Duration,
}; };
@ -60,7 +60,7 @@ enum InternalEvent {
}, },
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RelayServerEntry { pub struct RelayServerEntry {
id: Uuid, id: Uuid,
peer_id: String, peer_id: String,
@ -85,6 +85,7 @@ pub struct QuicTransport {
id: ListenerId, id: ListenerId,
p2p: Arc<P2P>, p2p: Arc<P2P>,
internal_tx: Sender<InternalEvent>, internal_tx: Sender<InternalEvent>,
relay_config: Mutex<Vec<RelayServerEntry>>,
} }
impl QuicTransport { impl QuicTransport {
@ -125,6 +126,7 @@ impl QuicTransport {
id, id,
p2p, p2p,
internal_tx, internal_tx,
relay_config: Mutex::new(Vec::new()),
}, },
libp2p_peer_id, libp2p_peer_id,
)) ))
@ -132,19 +134,34 @@ impl QuicTransport {
/// Configure the relay servers to use. /// Configure the relay servers to use.
/// This method will replace any existing relay servers. /// This method will replace any existing relay servers.
pub async fn relay_config(&self, relays: Vec<RelayServerEntry>) { pub async fn set_relay_config(&self, relays: Vec<RelayServerEntry>) {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let event = InternalEvent::RegisterRelays { relays, result: tx }; let event = InternalEvent::RegisterRelays {
relays: relays.clone(),
result: tx,
};
let Ok(_) = self.internal_tx.send(event) else { let Ok(_) = self.internal_tx.send(event) else {
return; return;
}; };
match rx.await.unwrap_or_else(|_| Ok(())) { match rx.await.unwrap_or_else(|_| Ok(())) {
Ok(_) => {} Ok(_) => {
*self
.relay_config
.lock()
.unwrap_or_else(PoisonError::into_inner) = relays;
}
Err(e) => error!("Failed to register relay config as the event loop has died: {e}"), Err(e) => error!("Failed to register relay config as the event loop has died: {e}"),
} }
} }
pub fn get_relay_config(&self) -> Vec<RelayServerEntry> {
self.relay_config
.lock()
.unwrap_or_else(PoisonError::into_inner)
.clone()
}
// `None` on the port means disabled. Use `0` for random port. // `None` on the port means disabled. Use `0` for random port.
pub async fn set_ipv4_enabled(&self, port: Option<u16>) -> Result<(), String> { pub async fn set_ipv4_enabled(&self, port: Option<u16>) -> Result<(), String> {
self.setup_listener( self.setup_listener(

View file

@ -223,7 +223,7 @@ const SpacedropNodes = () => {
return Array.from(discoveredPeers).map(([id, peer]) => ( return Array.from(discoveredPeers).map(([id, peer]) => (
<Menu.Item <Menu.Item
key={id} key={id}
label={peer.name} label={peer.metadata.name}
disabled={spacedrop.isLoading} disabled={spacedrop.isLoading}
onClick={async () => { onClick={async () => {
spacedrop.mutateAsync({ spacedrop.mutateAsync({

View file

@ -128,7 +128,8 @@ export function Spacedrop({ triggerClose }: { triggerClose: () => void }) {
<Node <Node
key={id} key={id}
id={id} id={id}
name={meta.name as HardwareModel} name={meta.metadata.name}
model={meta.metadata.device_model ?? 'Other'}
onDropped={onDropped} onDropped={onDropped}
/> />
))} ))}
@ -142,10 +143,12 @@ export function Spacedrop({ triggerClose }: { triggerClose: () => void }) {
function Node({ function Node({
id, id,
name, name,
model,
onDropped onDropped
}: { }: {
id: string; id: string;
name: HardwareModel; name: string;
model: HardwareModel;
onDropped: (id: string, files: string[]) => void; onDropped: (id: string, files: string[]) => void;
}) { }) {
const ref = useRef<HTMLDivElement>(null); const ref = useRef<HTMLDivElement>(null);
@ -178,7 +181,7 @@ function Node({
}); });
}} }}
> >
<Icon name={hardwareModelToIcon(name)} size={20} /> <Icon name={hardwareModelToIcon(model)} size={20} />
<h1>{name}</h1> <h1>{name}</h1>
</div> </div>
); );

View file

@ -39,7 +39,7 @@ export const Component = () => {
has_local_thumbnail: false, has_local_thumbnail: false,
thumbnail: null, thumbnail: null,
item: { item: {
...peer, ...peer.metadata,
pub_id: [] pub_id: []
} }
})), })),

View file

@ -190,6 +190,12 @@ export type Composite =
*/ */
"Live" "Live"
/**
* The method used for the connection with this peer.
* *Technically* you can have multiple under the hood but this simplifies things for the UX.
*/
export type ConnectionMethod = "Relay" | "Local" | "Disconnected"
export type ConvertImageArgs = { location_id: number; file_path_id: number; delete_src: boolean; desired_extension: ConvertibleExtension; quality_percentage: number | null } export type ConvertImageArgs = { location_id: number; file_path_id: number; delete_src: boolean; desired_extension: ConvertibleExtension; quality_percentage: number | null }
export type ConvertibleExtension = "bmp" | "dib" | "ff" | "gif" | "ico" | "jpg" | "jpeg" | "png" | "pnm" | "qoi" | "tga" | "icb" | "vda" | "vst" | "tiff" | "tif" | "hif" | "heif" | "heifs" | "heic" | "heics" | "avif" | "avci" | "avcs" | "svg" | "svgz" | "pdf" | "webp" export type ConvertibleExtension = "bmp" | "dib" | "ff" | "gif" | "ico" | "jpg" | "jpeg" | "png" | "pnm" | "qoi" | "tga" | "icb" | "vda" | "vst" | "tiff" | "tif" | "hif" | "heif" | "heifs" | "heic" | "heics" | "avif" | "avci" | "avcs" | "svg" | "svgz" | "pdf" | "webp"
@ -204,6 +210,12 @@ export type CursorOrderItem<T> = { order: SortOrder; data: T }
export type DefaultLocations = { desktop: boolean; documents: boolean; downloads: boolean; pictures: boolean; music: boolean; videos: boolean } export type DefaultLocations = { desktop: boolean; documents: boolean; downloads: boolean; pictures: boolean; music: boolean; videos: boolean }
/**
* The method used for the discovery of this peer.
* *Technically* you can have multiple under the hood but this simplifies things for the UX.
*/
export type DiscoveryMethod = "Relay" | "Local"
export type DiskType = "SSD" | "HDD" | "Removable" export type DiskType = "SSD" | "HDD" | "Removable"
export type DoubleClickAction = "openFile" | "quickPreview" export type DoubleClickAction = "openFile" | "quickPreview"
@ -511,10 +523,7 @@ export type Orientation = "Normal" | "CW90" | "CW180" | "CW270" | "MirroredVerti
export type P2PDiscoveryState = "Everyone" | "ContactsOnly" | "Disabled" export type P2PDiscoveryState = "Everyone" | "ContactsOnly" | "Disabled"
/** export type P2PEvent = { type: "PeerChange"; identity: RemoteIdentity; connection: ConnectionMethod; discovery: DiscoveryMethod; metadata: PeerMetadata } | { type: "PeerDelete"; identity: RemoteIdentity } | { type: "SpacedropRequest"; id: string; identity: RemoteIdentity; peer_name: string; files: string[] } | { type: "SpacedropProgress"; id: string; percent: number } | { type: "SpacedropTimedOut"; id: string } | { type: "SpacedropRejected"; id: string }
* TODO: P2P event for the frontend
*/
export type P2PEvent = { type: "DiscoveredPeer"; identity: RemoteIdentity; metadata: PeerMetadata } | { type: "ExpiredPeer"; identity: RemoteIdentity } | { type: "ConnectedPeer"; identity: RemoteIdentity } | { type: "DisconnectedPeer"; identity: RemoteIdentity } | { type: "SpacedropRequest"; id: string; identity: RemoteIdentity; peer_name: string; files: string[] } | { type: "SpacedropProgress"; id: string; percent: number } | { type: "SpacedropTimedOut"; id: string } | { type: "SpacedropRejected"; id: string }
export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; device_model: HardwareModel | null; version: string | null } export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; device_model: HardwareModel | null; version: string | null }

View file

@ -8,12 +8,17 @@ import {
useState useState
} from 'react'; } from 'react';
import { P2PEvent, PeerMetadata } from '../core'; import { ConnectionMethod, DiscoveryMethod, P2PEvent, PeerMetadata } from '../core';
import { useBridgeSubscription } from '../rspc'; import { useBridgeSubscription } from '../rspc';
type Peer = {
connection: ConnectionMethod;
discovery: DiscoveryMethod;
metadata: PeerMetadata;
};
type Context = { type Context = {
discoveredPeers: Map<string, PeerMetadata>; peers: Map<string, Peer>;
connectedPeers: Map<string, undefined>;
spacedropProgresses: Map<string, number>; spacedropProgresses: Map<string, number>;
events: MutableRefObject<EventTarget>; events: MutableRefObject<EventTarget>;
}; };
@ -22,26 +27,23 @@ const Context = createContext<Context>(null as any);
export function P2PContextProvider({ children }: PropsWithChildren) { export function P2PContextProvider({ children }: PropsWithChildren) {
const events = useRef(new EventTarget()); const events = useRef(new EventTarget());
const [[discoveredPeers], setDiscoveredPeer] = useState([new Map<string, PeerMetadata>()]); const [[peers], setPeers] = useState([new Map<string, Peer>()]);
const [[connectedPeers], setConnectedPeers] = useState([new Map<string, undefined>()]);
const [[spacedropProgresses], setSpacedropProgresses] = useState([new Map<string, number>()]); const [[spacedropProgresses], setSpacedropProgresses] = useState([new Map<string, number>()]);
useBridgeSubscription(['p2p.events'], { useBridgeSubscription(['p2p.events'], {
onData(data) { onData(data) {
events.current.dispatchEvent(new CustomEvent('p2p-event', { detail: data })); events.current.dispatchEvent(new CustomEvent('p2p-event', { detail: data }));
if (data.type === 'DiscoveredPeer') { if (data.type === 'PeerChange') {
discoveredPeers.set(data.identity, data.metadata); peers.set(data.identity, {
setDiscoveredPeer([discoveredPeers]); connection: data.connection,
} else if (data.type === 'ExpiredPeer') { discovery: data.discovery,
discoveredPeers.delete(data.identity); metadata: data.metadata
setDiscoveredPeer([discoveredPeers]); });
} else if (data.type === 'ConnectedPeer') { setPeers([peers]);
connectedPeers.set(data.identity, undefined); } else if (data.type === 'PeerDelete') {
setConnectedPeers([connectedPeers]); peers.delete(data.identity);
} else if (data.type === 'DisconnectedPeer') { setPeers([peers]);
connectedPeers.delete(data.identity);
setConnectedPeers([connectedPeers]);
} else if (data.type === 'SpacedropProgress') { } else if (data.type === 'SpacedropProgress') {
spacedropProgresses.set(data.id, data.percent); spacedropProgresses.set(data.id, data.percent);
setSpacedropProgresses([spacedropProgresses]); setSpacedropProgresses([spacedropProgresses]);
@ -52,8 +54,7 @@ export function P2PContextProvider({ children }: PropsWithChildren) {
return ( return (
<Context.Provider <Context.Provider
value={{ value={{
discoveredPeers, peers,
connectedPeers,
spacedropProgresses, spacedropProgresses,
events events
}} }}
@ -67,12 +68,16 @@ export function useP2PContextRaw() {
return useContext(Context); return useContext(Context);
} }
export function usePeers() {
return useContext(Context).peers;
}
export function useDiscoveredPeers() { export function useDiscoveredPeers() {
return useContext(Context).discoveredPeers; return new Map([...usePeers()].filter(([, peer]) => peer.connection === 'Disconnected'));
} }
export function useConnectedPeers() { export function useConnectedPeers() {
return useContext(Context).connectedPeers; return new Map([...usePeers()].filter(([, peer]) => peer.connection !== 'Disconnected'));
} }
export function useSpacedropProgress(id: string) { export function useSpacedropProgress(id: string) {