[ENG-994] Minor P2P fixes (#1238)

* mdns advertisement on detection

* fix bug in P2P discovery

* Add `P2PEvent::ExpirePeer`

* React properly

* Consistent tracing versions

* better logger config

* Reduce excessive logging from P2P

* Fix panic log hook

* Remove `dbg`

* Fix bug in thumbnailer remover

* wip: Connected peers in UI

* `Sync` is large pain

* init services after library load

* improve error handling of logger

* Sync protocol shutdown

* Add "updater" feature

* fix commands bindings

* nlm state debug query

* nlm debug status

* Make TS happy

* Add `RemoteIdentity` to libraries debug info

* Improve debug data presentation

* Among us level sus

* minor fix panic hook

* Fix EOF issue

* fix instance not being added on pairing

---------

Co-authored-by: Utku <74243531+utkubakir@users.noreply.github.com>
Co-authored-by: Ericson "Fogo" Soares <ericson.ds999@gmail.com>
This commit is contained in:
Oscar Beaumont 2023-08-27 14:41:26 +08:00 committed by GitHub
parent 09857de4c8
commit 7bf0c0ae4a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 503 additions and 267 deletions

14
Cargo.lock generated
View file

@ -7047,7 +7047,7 @@ dependencies = [
"tauri-build",
"tauri-specta",
"tokio",
"tracing 0.1.37",
"tracing 0.2.0",
"uuid",
"window-shadows",
]
@ -7087,7 +7087,7 @@ dependencies = [
"tempfile",
"thiserror",
"tokio",
"tracing 0.1.37",
"tracing 0.2.0",
"webp",
]
@ -7126,7 +7126,7 @@ version = "0.1.0"
dependencies = [
"jni 0.19.0",
"sd-mobile-core",
"tracing 0.1.37",
"tracing 0.2.0",
]
[[package]]
@ -7143,7 +7143,7 @@ dependencies = [
"sd-core",
"serde_json",
"tokio",
"tracing 0.1.37",
"tracing 0.2.0",
]
[[package]]
@ -7175,8 +7175,8 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util",
"tracing 0.1.37",
"tracing-subscriber 0.3.17",
"tracing 0.2.0",
"tracing-subscriber 0.3.0",
"uuid",
]
@ -7204,7 +7204,7 @@ dependencies = [
"sd-core",
"tokio",
"tower-http",
"tracing 0.1.37",
"tracing 0.2.0",
]
[[package]]

View file

@ -35,6 +35,12 @@ prisma-client-rust-sdk = { git = "https://github.com/Brendonovich/prisma-client-
"sqlite",
], default-features = false }
tracing = { git = "https://github.com/tokio-rs/tracing", rev = "29146260fb4615d271d2e899ad95a753bb42915e" } # To work with tracing-appender
tracing-subscriber = { git = "https://github.com/tokio-rs/tracing", rev = "29146260fb4615d271d2e899ad95a753bb42915e", features = [
"env-filter",
] } # To work with tracing-appender
tracing-appender = { git = "https://github.com/tokio-rs/tracing", rev = "29146260fb4615d271d2e899ad95a753bb42915e" } # Unreleased changes for rolling log deletion
rspc = { version = "0.1.4" }
specta = { version = "1.0.4" }
httpz = { version = "0.0.3" }

View file

@ -32,7 +32,7 @@ sd-core = { path = "../../../core", features = [
] }
tokio = { workspace = true, features = ["sync"] }
window-shadows = "0.2.1"
tracing = "0.1.37"
tracing = { workspace = true }
serde = "1.0.163"
percent-encoding = "2.2.0"
http = "0.2.9"

View file

@ -97,11 +97,11 @@ async fn main() -> tauri::Result<()> {
#[cfg(debug_assertions)]
let data_dir = data_dir.join("dev");
// Initialize and configure app logging
// Return value must be assigned to variable for flushing remaining logs on main exit throught Drop
let _guard = Node::init_logger(&data_dir);
let result = Node::new(data_dir).await;
// The `_guard` must be assigned to variable for flushing remaining logs on main exit through Drop
let (_guard, result) = match Node::init_logger(&data_dir) {
Ok(guard) => (Some(guard), Node::new(data_dir).await),
Err(err) => (None, Err(NodeError::Logger(err))),
};
let app = tauri::Builder::default();

View file

@ -18,4 +18,4 @@ jni = "0.19.0"
sd-mobile-core = { path = "../core" }
# Other
tracing = "0.1.37"
tracing = { workspace = true }

View file

@ -21,6 +21,6 @@ openssl-sys = { version = "0.9.88", features = [
"vendored",
] } # Override features of transitive dependencies to support IOS Simulator on M1
futures = "0.3.28"
tracing = "0.1.37"
tracing = { workspace = true }
futures-channel = "0.3.28"
futures-locks = "0.7.1"

View file

@ -18,7 +18,7 @@ rspc = { workspace = true, features = ["axum"] }
httpz = { workspace = true, features = ["axum"] }
axum = "0.6.18"
tokio = { workspace = true, features = ["sync", "rt-multi-thread", "signal"] }
tracing = "0.1.37"
tracing = { workspace = true }
ctrlc = "3.3.1"
http = "0.2.9"
tower-http = { version = "0.4.0", features = ["fs"] }

View file

@ -32,7 +32,12 @@ async fn main() {
.map(|port| port.parse::<u16>().unwrap_or(8080))
.unwrap_or(8080);
let _guard = Node::init_logger(&data_dir);
let _guard = match Node::init_logger(&data_dir) {
Ok(guard) => guard,
Err(e) => {
panic!("{}", e.to_string())
}
};
let (node, router) = match Node::new(data_dir).await {
Ok(d) => d,

View file

@ -70,8 +70,8 @@ include_dir = { version = "0.7.3", features = ["glob"] }
async-trait = "^0.1.68"
image = "0.24.6"
webp = "0.2.2"
tracing = { git = "https://github.com/tokio-rs/tracing", rev = "29146260fb4615d271d2e899ad95a753bb42915e" } # To work with tracing-appender
tracing-subscriber = { git = "https://github.com/tokio-rs/tracing", rev = "29146260fb4615d271d2e899ad95a753bb42915e", features = [
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = [
"env-filter",
] }
async-stream = "0.3.5"
@ -90,7 +90,7 @@ notify = { version = "5.2.0", default-features = false, features = [
static_assertions = "1.1.0"
serde-hashkey = "0.4.5"
normpath = { version = "1.1.1", features = ["localization"] }
tracing-appender = { git = "https://github.com/tokio-rs/tracing", rev = "29146260fb4615d271d2e899ad95a753bb42915e" } # Unreleased changes for log deletion
tracing-appender = { workspace = true }
strum = { version = "0.24", features = ["derive"] }
strum_macros = "0.24"
regex = "1.8.4"

View file

@ -6,6 +6,7 @@ use crate::{
use chrono::Utc;
use rspc::alpha::AlphaRouter;
use sd_p2p::spacetunnel::RemoteIdentity;
use sd_prisma::prisma::statistics;
use serde::{Deserialize, Serialize};
use specta::Type;
@ -18,9 +19,11 @@ use super::{
};
// TODO(@Oscar): Replace with `specta::json`
#[derive(Serialize, Deserialize, Type)]
#[derive(Serialize, Type)]
pub struct LibraryConfigWrapped {
pub uuid: Uuid,
pub instance_id: Uuid,
pub instance_public_key: RemoteIdentity,
pub config: LibraryConfig,
}
@ -34,6 +37,8 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.into_iter()
.map(|lib| LibraryConfigWrapped {
uuid: lib.id,
instance_id: lib.instance_uuid,
instance_public_key: lib.identity.to_remote_identity(),
config: lib.config.clone(),
})
.collect::<Vec<_>>()
@ -114,6 +119,8 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
Ok(LibraryConfigWrapped {
uuid: library.id,
instance_id: library.instance_uuid,
instance_public_key: library.identity.to_remote_identity(),
config: library.config.clone(),
})
})

View file

@ -23,11 +23,13 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
};
}
// // TODO: Don't block subscription start
// for peer in ctx.p2p_manager.get_connected_peers().await.unwrap() {
// // TODO: Send to frontend
// }
// TODO: Don't block subscription start
for peer_id in node.p2p.manager.get_connected_peers().await.unwrap() {
yield P2PEvent::ConnectedPeer {
peer_id,
};
}
while let Ok(event) = rx.recv().await {
yield event;
@ -35,6 +37,9 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
}
})
})
.procedure("nlmState", {
R.query(|node, _: ()| async move { node.nlm.state().await })
})
.procedure("spacedrop", {
#[derive(Type, Deserialize)]
pub struct SpacedropArgs {

View file

@ -14,7 +14,7 @@ use notifications::Notifications;
pub use sd_prisma::*;
use std::{
fmt,
env, fmt,
path::{Path, PathBuf},
sync::Arc,
};
@ -26,7 +26,12 @@ use tracing_appender::{
non_blocking::{NonBlocking, WorkerGuard},
rolling::{RollingFileAppender, Rotation},
};
use tracing_subscriber::{fmt as tracing_fmt, prelude::*, EnvFilter};
use tracing_subscriber::{
filter::{Directive, FromEnvError, LevelFilter},
fmt as tracing_fmt,
prelude::*,
EnvFilter,
};
pub mod api;
pub mod custom_uri;
@ -111,19 +116,20 @@ impl Node {
init_data.apply(&node.libraries, &node).await?;
}
// Finally load the libraries from disk into the library manager
node.libraries.init(&node).await?;
// It's import these are run after libraries are loaded!
locations_actor.start(node.clone());
jobs_actor.start(node.clone());
node.p2p.start(p2p_stream, node.clone());
// Finally load the libraries from disk into the library manager
node.libraries.init(&node).await?;
let router = api::mount();
info!("Spacedrive online.");
Ok((node, router))
}
pub fn init_logger(data_dir: impl AsRef<Path>) -> WorkerGuard {
pub fn init_logger(data_dir: impl AsRef<Path>) -> Result<WorkerGuard, FromEnvError> {
let (logfile, guard) = NonBlocking::new(
RollingFileAppender::builder()
.filename_prefix("sd.log")
@ -133,6 +139,17 @@ impl Node {
.expect("Error setting up log file!"),
);
// Set a default if the user hasn't set an override
if env::var("RUST_LOG") == Err(env::VarError::NotPresent) {
let directive: Directive = if cfg!(debug_assertions) {
LevelFilter::DEBUG
} else {
LevelFilter::INFO
}
.into();
env::set_var("RUST_LOG", directive.to_string());
}
let collector = tracing_subscriber::registry()
.with(
tracing_fmt::Subscriber::new()
@ -142,49 +159,12 @@ impl Node {
.with(
tracing_fmt::Subscriber::new()
.with_writer(std::io::stdout)
.with_filter(if cfg!(debug_assertions) {
EnvFilter::from_default_env()
.add_directive(
"warn".parse().expect("Error invalid tracing directive!"),
)
.add_directive(
"sd_core=debug"
.parse()
.expect("Error invalid tracing directive!"),
)
.add_directive(
"sd_core::location::manager=info"
.parse()
.expect("Error invalid tracing directive!"),
)
.add_directive(
"sd_core_mobile=debug"
.parse()
.expect("Error invalid tracing directive!"),
)
// .add_directive(
// "sd_p2p=debug"
// .parse()
// .expect("Error invalid tracing directive!"),
// )
.add_directive(
"server=debug"
.parse()
.expect("Error invalid tracing directive!"),
)
.add_directive(
"spacedrive=debug"
.parse()
.expect("Error invalid tracing directive!"),
)
.add_directive(
"rspc=debug"
.parse()
.expect("Error invalid tracing directive!"),
)
} else {
EnvFilter::from("info")
}),
.with_filter(
EnvFilter::builder()
.from_env()?
// We don't wanna blow up the logs
.add_directive("sd_core::location::manager=info".parse()?),
),
);
tracing::collect::set_global_default(collector)
@ -193,13 +173,19 @@ impl Node {
})
.ok();
let prev_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
error!("{}", panic_info);
prev_hook(panic_info);
std::panic::set_hook(Box::new(move |panic| {
if let Some(location) = panic.location() {
tracing::error!(
message = %panic,
panic.file = format!("{}:{}", location.file(), location.line()),
panic.column = location.column(),
);
} else {
tracing::error!(message = %panic);
}
}));
guard
Ok(guard)
}
pub async fn shutdown(&self) {
@ -254,4 +240,6 @@ pub enum NodeError {
#[cfg(debug_assertions)]
#[error("Init config error: {0}")]
InitConfig(#[from] util::debug_initializer::InitConfigError),
#[error("logger error: {0}")]
Logger(#[from] FromEnvError),
}

View file

@ -49,6 +49,8 @@ pub struct Library {
/// p2p identity
pub identity: Arc<Identity>,
pub orphan_remover: OrphanRemoverActor,
// The UUID which matches `config.instance_id`'s primary key.
pub instance_uuid: Uuid,
notifications: notifications::Notifications,
@ -63,6 +65,7 @@ impl Debug for Library {
// troublesome to implement Debug trait
f.debug_struct("LibraryContext")
.field("id", &self.id)
.field("instance_uuid", &self.instance_uuid)
.field("config", &self.config)
.field("db", &self.db)
.finish()
@ -73,6 +76,7 @@ impl Library {
pub async fn new(
id: Uuid,
config: LibraryConfig,
instance_uuid: Uuid,
identity: Arc<Identity>,
db: Arc<PrismaClient>,
node: &Arc<Node>,
@ -87,6 +91,7 @@ impl Library {
identity: identity.clone(),
orphan_remover: OrphanRemoverActor::spawn(db),
notifications: node.notifications.clone(),
instance_uuid,
event_bus_tx: node.event_bus.0.clone(),
})
}

View file

@ -384,6 +384,7 @@ impl Libraries {
let library = Library::new(
id,
config,
instance_id,
identity,
// key_manager,
db,

View file

@ -250,6 +250,9 @@ impl Actor {
//└── <cas_id>[0..2]/ # sharding
// └── <cas_id>.webp
fs::create_dir_all(&thumbnails_directory)
.await
.map_err(|e| FileIOError::from((thumbnails_directory, e)))?;
let mut read_dir = fs::read_dir(thumbnails_directory)
.await
.map_err(|e| FileIOError::from((thumbnails_directory, e)))?;

View file

@ -45,6 +45,15 @@ pub enum P2PEvent {
peer_id: PeerId,
metadata: PeerMetadata,
},
ExpiredPeer {
peer_id: PeerId,
},
ConnectedPeer {
peer_id: PeerId,
},
DisconnectedPeer {
peer_id: PeerId,
},
SpacedropRequest {
id: Uuid,
peer_id: PeerId,
@ -108,10 +117,6 @@ impl P2PManager {
let pairing = PairingManager::new(manager.clone(), tx.clone(), metadata_manager.clone());
// TODO: proper shutdown
// https://docs.rs/ctrlc/latest/ctrlc/
// https://docs.rs/system_shutdown/latest/system_shutdown/
Ok((
Arc::new(Self {
pairing,
@ -141,11 +146,6 @@ impl P2PManager {
while let Some(event) = stream.next().await {
match event {
Event::PeerDiscovered(event) => {
debug!(
"Discovered peer by id '{}' with address '{:?}' and metadata: {:?}",
event.peer_id, event.addresses, event.metadata
);
events
.send(P2PEvent::DiscoveredPeer {
peer_id: event.peer_id,
@ -156,12 +156,22 @@ impl P2PManager {
node.nlm.peer_discovered(event).await;
}
Event::PeerExpired { id, metadata } => {
debug!("Peer '{}' expired with metadata: {:?}", id, metadata);
Event::PeerExpired { id, .. } => {
events
.send(P2PEvent::ExpiredPeer { peer_id: id })
.map_err(|_| error!("Failed to send event to p2p event stream!"))
.ok();
node.nlm.peer_expired(id).await;
}
Event::PeerConnected(event) => {
debug!("Peer '{}' connected", event.peer_id);
events
.send(P2PEvent::ConnectedPeer {
peer_id: event.peer_id,
})
.map_err(|_| error!("Failed to send event to p2p event stream!"))
.ok();
node.nlm.peer_connected(event.peer_id).await;
if event.establisher {
@ -175,7 +185,11 @@ impl P2PManager {
}
}
Event::PeerDisconnected(peer_id) => {
debug!("Peer '{}' disconnected", peer_id);
events
.send(P2PEvent::DisconnectedPeer { peer_id })
.map_err(|_| error!("Failed to send event to p2p event stream!"))
.ok();
node.nlm.peer_disconnected(peer_id).await;
}
Event::PeerMessage(event) => {
@ -297,7 +311,7 @@ impl P2PManager {
shutdown = true;
break;
}
_ => debug!("event: {:?}", event),
_ => {}
}
}

View file

@ -286,6 +286,7 @@ impl PairingManager {
.exec()
.await
.unwrap();
library_manager.update_instances(library.clone()).await;
stream
.write_all(

View file

@ -1,9 +1,10 @@
use std::{collections::HashMap, env, str::FromStr};
use itertools::Itertools;
use sd_p2p::{spacetunnel::RemoteIdentity, Metadata};
use sd_p2p::{spacetunnel::RemoteIdentity, Metadata, PeerId};
use serde::{Deserialize, Serialize};
use specta::Type;
use tracing::warn;
use crate::node::Platform;
@ -54,7 +55,7 @@ impl Metadata for PeerMetadata {
map
}
fn from_hashmap(data: &HashMap<String, String>) -> Result<Self, String>
fn from_hashmap(peer_id: &PeerId, data: &HashMap<String, String>) -> Result<Self, String>
where
Self: Sized,
{
@ -81,20 +82,31 @@ impl Metadata for PeerMetadata {
i += 1;
}
if instances.is_empty() {
return Err("DNS record for field 'instances' missing. Unable to decode 'PeerMetadata'!"
.to_owned());
}
instances
.split(',')
.map(|s| {
.filter_map(|s| {
// "".split(",").collect::<Vec<_>>() == [""]
if s.is_empty() {
return None;
}
RemoteIdentity::from_bytes(
&hex::decode(s).map_err(|_| "Unable to decode instance!")?,
&hex::decode(s)
.map_err(|e| {
warn!(
"Unable to parse instance from peer '{peer_id}'s metadata!"
);
e
})
.ok()?,
)
.map_err(|_| "Unable to parse instance!")
.map_err(|e| {
warn!("Unable to parse instance from peer '{peer_id}'s metadata!");
e
})
.ok()
})
.collect::<Result<Vec<_>, _>>()?
.collect::<Vec<_>>()
},
})
}

View file

@ -7,6 +7,8 @@ use sd_p2p::{
DiscoveredPeer, PeerId,
};
use sd_sync::CRDTOperation;
use serde::Serialize;
use specta::Type;
use sync::GetOpsArgs;
use tokio::{
@ -26,17 +28,20 @@ use super::{Header, IdentityOrRemoteIdentity, P2PManager, PeerMetadata};
mod proto;
pub use proto::*;
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, Serialize, Type)]
pub enum InstanceState {
Unavailable,
Discovered(PeerId),
Connected(PeerId),
}
#[derive(Debug, Clone, Serialize, Type)]
pub struct LibraryData {
instances: HashMap<RemoteIdentity /* Identity public key */, InstanceState>,
}
type LibrariesMap = HashMap<Uuid /* Library ID */, LibraryData>;
pub struct NetworkedLibraries {
p2p: Arc<P2PManager>,
pub(crate) libraries: RwLock<HashMap<Uuid /* Library ID */, LibraryData>>,
@ -112,11 +117,11 @@ impl NetworkedLibraries {
.filter_map(|i| {
// TODO: Error handling
match IdentityOrRemoteIdentity::from_bytes(&i.identity).unwrap() {
IdentityOrRemoteIdentity::Identity(identity) => {
Some((identity.to_remote_identity(), InstanceState::Unavailable))
}
// We don't own it so don't advertise it
IdentityOrRemoteIdentity::RemoteIdentity(_) => None,
IdentityOrRemoteIdentity::Identity(_) => None,
IdentityOrRemoteIdentity::RemoteIdentity(identity) => {
Some((identity, InstanceState::Unavailable))
}
}
})
.collect(),
@ -212,6 +217,10 @@ impl NetworkedLibraries {
}
}
}
pub async fn state(&self) -> LibrariesMap {
self.libraries.read().await.clone()
}
}
// These functions could be moved to some separate protocol abstraction
@ -291,7 +300,7 @@ mod originator {
.unwrap();
tunnel.flush().await.unwrap();
while let Ok(rx::GetOperations(args)) =
while let Ok(rx::GetOperations::Operations(args)) =
rx::GetOperations::from_stream(&mut tunnel).await
{
let ops = sync.get_ops(args).await.unwrap();
@ -318,28 +327,31 @@ mod responder {
use originator::tx as rx;
pub mod tx {
use serde::{Deserialize, Serialize};
use super::*;
pub struct GetOperations(pub GetOpsArgs);
#[derive(Serialize, Deserialize)]
pub enum GetOperations {
Operations(GetOpsArgs),
Done,
}
impl GetOperations {
// TODO: Per field errors for better error handling
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
) -> std::io::Result<Self> {
Ok(Self(
Ok(
// TODO: Error handling
rmp_serde::from_slice(&decode::buf(stream).await.unwrap()).unwrap(),
))
)
}
pub fn to_bytes(&self) -> Vec<u8> {
let Self(ops) = self;
let mut buf = vec![];
// TODO: Error handling
encode::buf(&mut buf, &rmp_serde::to_vec_named(&ops).unwrap());
encode::buf(&mut buf, &rmp_serde::to_vec_named(&self).unwrap());
buf
}
}
@ -349,6 +361,16 @@ mod responder {
let ingest = &library.sync.ingest;
let Ok(mut rx) = ingest.req_rx.try_lock() else {
println!("Rejected sync due to libraries lock being held!");
// TODO: Proper error returned to remote instead of this.
// TODO: We can't just abort the connection when the remote is expecting data.
tunnel
.write_all(&tx::GetOperations::Done.to_bytes())
.await
.unwrap();
tunnel.flush().await.unwrap();
return;
};
@ -369,7 +391,7 @@ mod responder {
tunnel
.write_all(
&tx::GetOperations(sync::GetOpsArgs {
&tx::GetOperations::Operations(sync::GetOpsArgs {
clocks: timestamps,
count: OPS_PER_REQUEST,
})
@ -391,5 +413,11 @@ mod responder {
.await
.expect("TODO: Handle ingest channel closed, so we don't loose ops");
}
tunnel
.write_all(&tx::GetOperations::Done.to_bytes())
.await
.unwrap();
tunnel.flush().await.unwrap();
}
}

View file

@ -11,7 +11,7 @@ edition = { workspace = true }
[dependencies]
ffmpeg-sys-next = "6.0.1"
tracing = "0.1.37"
tracing = { workspace = true }
thiserror = "1.0.40"
webp = "0.2.2"

View file

@ -27,7 +27,7 @@ if-watch = { version = "3.0.1", features = [
] } # Override the features of if-watch which is used by libp2p-quic
mdns-sd = "0.6.1"
thiserror = "1.0.40"
tracing = "0.1.37"
tracing = { workspace = true }
serde = { version = "1.0.163", features = ["derive"] }
rmp-serde = "1.1.1"
specta = { workspace = true }
@ -41,4 +41,4 @@ hex = "0.4.3"
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread"] }
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }

View file

@ -1,6 +1,6 @@
use std::{collections::HashMap, env, time::Duration};
use sd_p2p::{Event, Keypair, Manager, Metadata, MetadataManager};
use sd_p2p::{Event, Keypair, Manager, Metadata, MetadataManager, PeerId};
use tokio::{io::AsyncReadExt, time::sleep};
use tracing::{debug, error, info};
@ -14,7 +14,7 @@ impl Metadata for PeerMetadata {
HashMap::from([("name".to_owned(), self.name)])
}
fn from_hashmap(data: &HashMap<String, String>) -> Result<Self, String>
fn from_hashmap(_: &PeerId, data: &HashMap<String, String>) -> Result<Self, String>
where
Self: Sized,
{

View file

@ -12,7 +12,7 @@ use tokio::{
sync::{mpsc, RwLock},
time::{sleep_until, Instant, Sleep},
};
use tracing::{error, info, warn};
use tracing::{debug, error, trace, warn};
use crate::{DiscoveredPeer, Event, Manager, Metadata, MetadataManager, PeerId};
@ -38,6 +38,7 @@ where
mdns_service_receiver: flume::Receiver<ServiceEvent>,
service_name: String,
next_mdns_advertisement: Pin<Box<Sleep>>,
next_allowed_discovery_advertisement: Instant,
trigger_advertisement: mpsc::UnboundedReceiver<()>,
pub(crate) state: Arc<MdnsState<TMetadata>>,
}
@ -70,6 +71,7 @@ where
mdns_service_receiver,
service_name,
next_mdns_advertisement: Box::pin(sleep_until(Instant::now())), // Trigger an advertisement immediately
next_allowed_discovery_advertisement: Instant::now(),
trigger_advertisement: advertise_rx,
state: state.clone(),
},
@ -84,6 +86,13 @@ where
/// Do an mdns advertisement to the network.
async fn advertise(&mut self) {
self.inner_advertise().await;
self.next_mdns_advertisement =
Box::pin(sleep_until(Instant::now() + MDNS_READVERTISEMENT_INTERVAL));
}
async fn inner_advertise(&self) {
let metadata = self.metadata_manager.get().to_hashmap();
// This is in simple terms converts from `Vec<(ip, port)>` to `Vec<(Vec<Ip>, port)>`
@ -119,15 +128,12 @@ where
}
for (_, service) in services.into_iter() {
info!("advertising mdns service: {:?}", service);
trace!("advertising mdns service: {:?}", service);
match self.mdns_daemon.register(service) {
Ok(_) => {}
Err(err) => warn!("error registering mdns service: {}", err),
}
}
self.next_mdns_advertisement =
Box::pin(sleep_until(Instant::now() + MDNS_READVERTISEMENT_INTERVAL));
}
// TODO: if the channel's sender is dropped will this cause the `tokio::select` in the `manager.rs` to infinitely loop?
@ -153,6 +159,7 @@ where
}
match TMetadata::from_hashmap(
&peer_id,
&info
.get_properties()
.iter()
@ -165,9 +172,19 @@ where
self.state.discovered.write().await;
let peer = if let Some(peer) = discovered_peers.remove(&peer_id) {
peer
} else {
// Found a new peer, let's readvertise our mdns service as it may have just come online
// `self.last_discovery_advertisement` is to prevent DOS-style attacks.
let now = Instant::now();
if self.next_allowed_discovery_advertisement <= now {
self.next_allowed_discovery_advertisement = now + Duration::from_secs(1);
self.inner_advertise().await;
self.next_mdns_advertisement =
Box::pin(sleep_until(Instant::now() + MDNS_READVERTISEMENT_INTERVAL));
}
DiscoveredPeer {
manager: manager.clone(),
peer_id,
@ -188,11 +205,13 @@ where
discovered_peers.insert(peer_id, peer.clone());
peer
};
debug!(
"Discovered peer by id '{}' with address '{:?}' and metadata: {:?}",
peer.peer_id, peer.addresses, peer.metadata
);
return Some(Event::PeerDiscovered(peer));
}
Err(err) => {
error!("error parsing metadata for peer '{}': {}", raw_peer_id, err)
}
Err(err) => error!("error parsing metadata for peer '{}': {}", raw_peer_id, err)
}
}
Err(_) => warn!(
@ -216,9 +235,11 @@ where
self.state.discovered.write().await;
let peer = discovered_peers.remove(&peer_id);
let metadata = peer.map(|p| p.metadata);
debug!("Peer '{peer_id}' expired with metadata: {metadata:?}");
return Some(Event::PeerExpired {
id: peer_id,
metadata: peer.map(|p| p.metadata),
metadata,
});
}
}

View file

@ -10,12 +10,12 @@ use std::{
use libp2p::{core::muxing::StreamMuxerBox, swarm::SwarmBuilder, Transport};
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, error, warn};
use tracing::{error, trace, warn};
use crate::{
spacetime::{SpaceTime, UnicastStream},
DiscoveredPeer, Keypair, ManagerStream, ManagerStreamAction, Mdns, MdnsState, Metadata,
MetadataManager, PeerId,
DiscoveredPeer, Keypair, ManagerStream, ManagerStreamAction, ManagerStreamAction2, Mdns,
MdnsState, Metadata, MetadataManager, PeerId,
};
/// Is the core component of the P2P system that holds the state and delegates actions to the other components
@ -25,7 +25,8 @@ pub struct Manager<TMetadata: Metadata> {
pub(crate) peer_id: PeerId,
pub(crate) application_name: &'static [u8],
pub(crate) stream_id: AtomicU64,
event_stream_tx: mpsc::Sender<ManagerStreamAction<TMetadata>>,
event_stream_tx: mpsc::Sender<ManagerStreamAction>,
event_stream_tx2: mpsc::Sender<ManagerStreamAction2<TMetadata>>,
}
impl<TMetadata: Metadata> Manager<TMetadata> {
@ -42,7 +43,8 @@ impl<TMetadata: Metadata> Manager<TMetadata> {
.ok_or(ManagerError::InvalidAppName)?;
let peer_id = PeerId(keypair.raw_peer_id());
let (event_stream_tx, event_stream_rx) = mpsc::channel(1024);
let (event_stream_tx, event_stream_rx) = mpsc::channel(128);
let (event_stream_tx2, event_stream_rx2) = mpsc::channel(128);
let (mdns, mdns_state) = Mdns::new(application_name, peer_id, metadata_manager)
.await
@ -58,6 +60,7 @@ impl<TMetadata: Metadata> Manager<TMetadata> {
stream_id: AtomicU64::new(0),
peer_id,
event_stream_tx,
event_stream_tx2,
});
let mut swarm = SwarmBuilder::with_tokio_executor(
@ -74,13 +77,13 @@ impl<TMetadata: Metadata> Manager<TMetadata> {
let listener_id = swarm
.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse().expect("Error passing libp2p multiaddr. This value is hardcoded so this should be impossible."))
.unwrap();
debug!("created ipv4 listener with id '{:?}'", listener_id);
trace!("created ipv4 listener with id '{:?}'", listener_id);
}
{
let listener_id = swarm
.listen_on("/ip6/::/udp/0/quic-v1".parse().expect("Error passing libp2p multiaddr. This value is hardcoded so this should be impossible."))
.unwrap();
debug!("created ipv4 listener with id '{:?}'", listener_id);
trace!("created ipv4 listener with id '{:?}'", listener_id);
}
Ok((
@ -88,6 +91,7 @@ impl<TMetadata: Metadata> Manager<TMetadata> {
ManagerStream {
manager: this,
event_stream_rx,
event_stream_rx2,
swarm,
mdns,
queued_events: Default::default(),
@ -97,7 +101,7 @@ impl<TMetadata: Metadata> Manager<TMetadata> {
))
}
pub(crate) async fn emit(&self, event: ManagerStreamAction<TMetadata>) {
pub(crate) async fn emit(&self, event: ManagerStreamAction) {
match self.event_stream_tx.send(event).await {
Ok(_) => {}
Err(err) => warn!("error emitting event: {}", err),
@ -131,12 +135,19 @@ impl<TMetadata: Metadata> Manager<TMetadata> {
}
// TODO: Does this need any timeouts to be added cause hanging forever is bad?
// be aware this method is `!Sync` so can't be used from rspc. // TODO: Can this limitation be removed?
#[allow(clippy::unused_unit)] // TODO: Remove this clippy override once error handling is added
pub async fn stream(&self, peer_id: PeerId) -> Result<UnicastStream, ()> {
// TODO: With this system you can send to any random peer id. Can I reduce that by requiring `.connect(peer_id).unwrap().send(data)` or something like that.
let (tx, rx) = oneshot::channel();
self.emit(ManagerStreamAction::StartStream(peer_id, tx))
.await;
match self
.event_stream_tx2
.send(ManagerStreamAction2::StartStream(peer_id, tx))
.await
{
Ok(_) => {}
Err(err) => warn!("error emitting event: {}", err),
}
let mut stream = rx.await.map_err(|_| {
warn!("failed to queue establishing stream to peer '{peer_id}'!");

View file

@ -17,7 +17,7 @@ use libp2p::{
Swarm,
};
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, trace, warn};
use crate::{
quic_multiaddr_to_socketaddr, socketaddr_to_quic_multiaddr,
@ -26,9 +26,9 @@ use crate::{
};
/// TODO
pub enum ManagerStreamAction<TMetadata: Metadata> {
/// Events are returned to the application via the `ManagerStream::next` method.
Event(Event<TMetadata>),
///
/// This is `Sync` so it can be used from within rspc.
pub enum ManagerStreamAction {
/// TODO
GetConnectedPeers(oneshot::Sender<Vec<PeerId>>),
/// Tell the [`libp2p::Swarm`](libp2p::Swarm) to establish a new connection to a peer.
@ -37,20 +37,34 @@ pub enum ManagerStreamAction<TMetadata: Metadata> {
addresses: Vec<SocketAddr>,
},
/// TODO
StartStream(PeerId, oneshot::Sender<UnicastStream>),
/// TODO
BroadcastData(Vec<u8>),
/// the node is shutting down. The `ManagerStream` should convert this into `Event::Shutdown`
Shutdown(oneshot::Sender<()>),
}
impl<TMetadata: Metadata> fmt::Debug for ManagerStreamAction<TMetadata> {
/// TODO: Get ride of this and merge into `ManagerStreamAction` without breaking rspc procedures
///
/// This is `!Sync` so can't be used from within rspc.
pub enum ManagerStreamAction2<TMetadata: Metadata> {
/// Events are returned to the application via the `ManagerStream::next` method.
Event(Event<TMetadata>),
/// TODO
StartStream(PeerId, oneshot::Sender<UnicastStream>),
}
impl fmt::Debug for ManagerStreamAction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ManagerStreamAction")
}
}
impl<TMetadata: Metadata> From<Event<TMetadata>> for ManagerStreamAction<TMetadata> {
impl<TMetadata: Metadata> fmt::Debug for ManagerStreamAction2<TMetadata> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ManagerStreamAction2")
}
}
impl<TMetadata: Metadata> From<Event<TMetadata>> for ManagerStreamAction2<TMetadata> {
fn from(event: Event<TMetadata>) -> Self {
Self::Event(event)
}
@ -59,7 +73,8 @@ impl<TMetadata: Metadata> From<Event<TMetadata>> for ManagerStreamAction<TMetada
/// TODO
pub struct ManagerStream<TMetadata: Metadata> {
pub(crate) manager: Arc<Manager<TMetadata>>,
pub(crate) event_stream_rx: mpsc::Receiver<ManagerStreamAction<TMetadata>>,
pub(crate) event_stream_rx: mpsc::Receiver<ManagerStreamAction>,
pub(crate) event_stream_rx2: mpsc::Receiver<ManagerStreamAction2<TMetadata>>,
pub(crate) swarm: Swarm<SpaceTime<TMetadata>>,
pub(crate) mdns: Mdns<TMetadata>,
pub(crate) queued_events: VecDeque<Event<TMetadata>>,
@ -67,6 +82,25 @@ pub struct ManagerStream<TMetadata: Metadata> {
pub(crate) on_establish_streams: HashMap<libp2p::PeerId, Vec<OutboundRequest>>,
}
enum EitherManagerStreamAction<TMetadata: Metadata> {
A(ManagerStreamAction),
B(ManagerStreamAction2<TMetadata>),
}
impl<TMetadata: Metadata> From<ManagerStreamAction> for EitherManagerStreamAction<TMetadata> {
fn from(event: ManagerStreamAction) -> Self {
Self::A(event)
}
}
impl<TMetadata: Metadata> From<ManagerStreamAction2<TMetadata>>
for EitherManagerStreamAction<TMetadata>
{
fn from(event: ManagerStreamAction2<TMetadata>) -> Self {
Self::B(event)
}
}
impl<TMetadata> ManagerStream<TMetadata>
where
TMetadata: Metadata,
@ -92,14 +126,20 @@ where
},
event = self.event_stream_rx.recv() => {
// If the sender has shut down we return `None` to also shut down too.
if let Some(event) = self.handle_manager_stream_action(event?).await {
if let Some(event) = self.handle_manager_stream_action(event?.into()).await {
return Some(event);
}
}
event = self.event_stream_rx2.recv() => {
// If the sender has shut down we return `None` to also shut down too.
if let Some(event) = self.handle_manager_stream_action(event?.into()).await {
return Some(event);
}
}
event = self.swarm.select_next_some() => {
match event {
SwarmEvent::Behaviour(event) => {
if let Some(event) = self.handle_manager_stream_action(event).await {
if let Some(event) = self.handle_manager_stream_action(event.into()).await {
if let Event::Shutdown { .. } = event {
self.shutdown.store(true, Ordering::Relaxed);
}
@ -128,7 +168,7 @@ where
SwarmEvent::NewListenAddr { address, .. } => {
match quic_multiaddr_to_socketaddr(address) {
Ok(addr) => {
debug!("listen address added: {}", addr);
trace!("listen address added: {}", addr);
self.mdns.register_addr(addr).await;
return Some(Event::AddListenAddr(addr));
},
@ -141,7 +181,7 @@ where
SwarmEvent::ExpiredListenAddr { address, .. } => {
match quic_multiaddr_to_socketaddr(address) {
Ok(addr) => {
debug!("listen address added: {}", addr);
trace!("listen address expired: {}", addr);
self.mdns.unregister_addr(&addr).await;
return Some(Event::RemoveListenAddr(addr));
},
@ -152,11 +192,11 @@ where
}
}
SwarmEvent::ListenerClosed { listener_id, addresses, reason } => {
debug!("listener '{:?}' was closed due to: {:?}", listener_id, reason);
trace!("listener '{:?}' was closed due to: {:?}", listener_id, reason);
for address in addresses {
match quic_multiaddr_to_socketaddr(address) {
Ok(addr) => {
debug!("listen address added: {}", addr);
trace!("listen address closed: {}", addr);
self.mdns.unregister_addr(&addr).await;
self.queued_events.push_back(Event::RemoveListenAddr(addr));
@ -182,50 +222,24 @@ where
async fn handle_manager_stream_action(
&mut self,
event: ManagerStreamAction<TMetadata>,
event: EitherManagerStreamAction<TMetadata>,
) -> Option<Event<TMetadata>> {
match event {
ManagerStreamAction::Event(event) => return Some(event),
ManagerStreamAction::GetConnectedPeers(response) => {
response
.send(
self.swarm
.connected_peers()
.map(|v| PeerId(*v))
.collect::<Vec<_>>(),
)
.map_err(|_| {
error!("Error sending response to `GetConnectedPeers` request! Sending was dropped!")
})
.ok();
}
ManagerStreamAction::Dial { peer_id, addresses } => {
match self.swarm.dial(
DialOpts::peer_id(peer_id.0)
.condition(PeerCondition::Disconnected)
.addresses(addresses.iter().map(socketaddr_to_quic_multiaddr).collect())
.build(),
) {
Ok(()) => {}
Err(err) => warn!(
"error dialing peer '{}' with addresses '{:?}': {}",
peer_id, addresses, err
),
EitherManagerStreamAction::A(event) => match event {
ManagerStreamAction::GetConnectedPeers(response) => {
response
.send(
self.swarm
.connected_peers()
.map(|v| PeerId(*v))
.collect::<Vec<_>>(),
)
.map_err(|_| {
error!("Error sending response to `GetConnectedPeers` request! Sending was dropped!")
})
.ok();
}
}
ManagerStreamAction::StartStream(peer_id, tx) => {
if !self.swarm.connected_peers().any(|v| *v == peer_id.0) {
let addresses = self
.mdns
.state
.discovered
.read()
.await
.get(&peer_id)
.unwrap()
.addresses
.clone();
ManagerStreamAction::Dial { peer_id, addresses } => {
match self.swarm.dial(
DialOpts::peer_id(peer_id.0)
.condition(PeerCondition::Disconnected)
@ -238,43 +252,74 @@ where
peer_id, addresses, err
),
}
self.on_establish_streams
.entry(peer_id.0)
.or_default()
.push(OutboundRequest::Unicast(tx));
} else {
self.swarm
.behaviour_mut()
.pending_events
.push_back(ToSwarm::NotifyHandler {
peer_id: peer_id.0,
}
ManagerStreamAction::BroadcastData(data) => {
let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
let behaviour = self.swarm.behaviour_mut();
debug!("Broadcasting message to '{:?}'", connected_peers);
for peer_id in connected_peers {
behaviour.pending_events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: OutboundRequest::Unicast(tx),
event: OutboundRequest::Broadcast(data.clone()),
});
}
}
}
ManagerStreamAction::BroadcastData(data) => {
let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
let behaviour = self.swarm.behaviour_mut();
debug!("Broadcasting message to '{:?}'", connected_peers);
for peer_id in connected_peers {
behaviour.pending_events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: OutboundRequest::Broadcast(data.clone()),
ManagerStreamAction::Shutdown(tx) => {
info!("Shutting down P2P Manager...");
self.mdns.shutdown().await;
tx.send(()).unwrap_or_else(|_| {
warn!("Error sending shutdown signal to P2P Manager!");
});
}
}
ManagerStreamAction::Shutdown(tx) => {
info!("Shutting down P2P Manager...");
self.mdns.shutdown().await;
tx.send(()).unwrap_or_else(|_| {
warn!("Error sending shutdown signal to P2P Manager!");
});
return Some(Event::Shutdown);
}
return Some(Event::Shutdown);
}
},
EitherManagerStreamAction::B(event) => match event {
ManagerStreamAction2::Event(event) => return Some(event),
ManagerStreamAction2::StartStream(peer_id, tx) => {
if !self.swarm.connected_peers().any(|v| *v == peer_id.0) {
let addresses = self
.mdns
.state
.discovered
.read()
.await
.get(&peer_id)
.unwrap()
.addresses
.clone();
match self.swarm.dial(
DialOpts::peer_id(peer_id.0)
.condition(PeerCondition::Disconnected)
.addresses(
addresses.iter().map(socketaddr_to_quic_multiaddr).collect(),
)
.build(),
) {
Ok(()) => {}
Err(err) => warn!(
"error dialing peer '{}' with addresses '{:?}': {}",
peer_id, addresses, err
),
}
self.on_establish_streams
.entry(peer_id.0)
.or_default()
.push(OutboundRequest::Unicast(tx));
} else {
self.swarm.behaviour_mut().pending_events.push_back(
ToSwarm::NotifyHandler {
peer_id: peer_id.0,
handler: NotifyHandler::Any,
event: OutboundRequest::Unicast(tx),
},
);
}
}
},
}
None

View file

@ -16,7 +16,7 @@ use libp2p::{
use thiserror::Error;
use tracing::debug;
use crate::{ConnectedPeer, Event, Manager, ManagerStreamAction, Metadata, PeerId};
use crate::{ConnectedPeer, Event, Manager, ManagerStreamAction2, Metadata, PeerId};
use super::SpaceTimeConnection;
@ -53,7 +53,7 @@ impl<TMetadata: Metadata> SpaceTime<TMetadata> {
impl<TMetadata: Metadata> NetworkBehaviour for SpaceTime<TMetadata> {
type ConnectionHandler = SpaceTimeConnection<TMetadata>;
type OutEvent = ManagerStreamAction<TMetadata>;
type OutEvent = ManagerStreamAction2<TMetadata>;
fn handle_established_inbound_connection(
&mut self,
@ -116,13 +116,14 @@ impl<TMetadata: Metadata> NetworkBehaviour for SpaceTime<TMetadata> {
debug!("sending establishment request to peer '{}'", peer_id);
if other_established == 0 {
self.pending_events.push_back(ToSwarm::GenerateEvent(
ManagerStreamAction::Event(Event::PeerConnected(ConnectedPeer {
Event::PeerConnected(ConnectedPeer {
peer_id,
establisher: match endpoint {
ConnectedPoint::Dialer { .. } => true,
ConnectedPoint::Listener { .. } => false,
},
})),
})
.into(),
));
}
}
@ -136,7 +137,7 @@ impl<TMetadata: Metadata> NetworkBehaviour for SpaceTime<TMetadata> {
if remaining_established == 0 {
debug!("Disconnected from peer '{}'", peer_id);
self.pending_events.push_back(ToSwarm::GenerateEvent(
ManagerStreamAction::Event(Event::PeerDisconnected(peer_id)),
Event::PeerDisconnected(peer_id).into(),
));
}
}

View file

@ -14,7 +14,7 @@ use std::{
};
use tracing::error;
use crate::{Manager, ManagerStreamAction, Metadata, PeerId};
use crate::{Manager, ManagerStreamAction2, Metadata, PeerId};
use super::{InboundProtocol, OutboundProtocol, OutboundRequest, EMPTY_QUEUE_SHRINK_THRESHOLD};
@ -49,7 +49,7 @@ impl<TMetadata: Metadata> SpaceTimeConnection<TMetadata> {
impl<TMetadata: Metadata> ConnectionHandler for SpaceTimeConnection<TMetadata> {
type InEvent = OutboundRequest;
type OutEvent = ManagerStreamAction<TMetadata>;
type OutEvent = ManagerStreamAction2<TMetadata>;
type Error = ConnectionHandlerUpgrErr<io::Error>;
type InboundProtocol = InboundProtocol<TMetadata>;
type OutboundProtocol = OutboundProtocol;

View file

@ -11,7 +11,7 @@ use tracing::debug;
use crate::{
spacetime::{BroadcastStream, UnicastStream},
Manager, ManagerStreamAction, Metadata, PeerId, PeerMessageEvent,
Manager, ManagerStreamAction2, Metadata, PeerId, PeerMessageEvent,
};
use super::SpaceTimeProtocolName;
@ -31,7 +31,7 @@ impl<TMetadata: Metadata> UpgradeInfo for InboundProtocol<TMetadata> {
}
impl<TMetadata: Metadata> InboundUpgrade<NegotiatedSubstream> for InboundProtocol<TMetadata> {
type Output = ManagerStreamAction<TMetadata>;
type Output = ManagerStreamAction2<TMetadata>;
type Error = ();
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send + 'static>>;
@ -48,7 +48,7 @@ impl<TMetadata: Metadata> InboundUpgrade<NegotiatedSubstream> for InboundProtoco
match discriminator {
crate::spacetime::BROADCAST_DISCRIMINATOR => {
debug!("stream({}, {id}): broadcast stream accepted", self.peer_id);
Ok(ManagerStreamAction::Event(
Ok(ManagerStreamAction2::Event(
PeerMessageEvent {
stream_id: id,
peer_id: self.peer_id,
@ -62,7 +62,7 @@ impl<TMetadata: Metadata> InboundUpgrade<NegotiatedSubstream> for InboundProtoco
crate::spacetime::UNICAST_DISCRIMINATOR => {
debug!("stream({}, {id}): unicast stream accepted", self.peer_id);
Ok(ManagerStreamAction::Event(
Ok(ManagerStreamAction2::Event(
PeerMessageEvent {
stream_id: id,
peer_id: self.peer_id,

View file

@ -2,6 +2,8 @@ use std::hash::{Hash, Hasher};
use ed25519_dalek::PublicKey;
use rand_core::OsRng;
use serde::Serialize;
use specta::Type;
use thiserror::Error;
#[derive(Debug, Error)]
@ -58,6 +60,21 @@ impl std::fmt::Debug for RemoteIdentity {
}
}
impl Serialize for RemoteIdentity {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_str(&hex::encode(self.0.as_bytes()))
}
}
impl Type for RemoteIdentity {
fn inline(
_: specta::DefOpts,
_: &[specta::DataType],
) -> Result<specta::DataType, specta::ExportError> {
Ok(specta::DataType::Primitive(specta::PrimitiveType::String))
}
}
impl RemoteIdentity {
pub fn from_bytes(bytes: &[u8]) -> Result<Self, IdentityErr> {
Ok(Self(ed25519_dalek::PublicKey::from_bytes(bytes)?))

View file

@ -1,10 +1,12 @@
use std::collections::HashMap;
use std::{collections::HashMap, fmt::Debug};
use crate::PeerId;
/// this trait must be implemented for the metadata type to allow it to be converted to MDNS DNS records.
pub trait Metadata: Clone + Send + Sync + 'static {
pub trait Metadata: Debug + Clone + Send + Sync + 'static {
fn to_hashmap(self) -> HashMap<String, String>;
fn from_hashmap(data: &HashMap<String, String>) -> Result<Self, String>
fn from_hashmap(peer_id: &PeerId, data: &HashMap<String, String>) -> Result<Self, String>
where
Self: Sized;
}

View file

@ -1,4 +1,11 @@
import { useBridgeMutation, useDiscoveredPeers, useFeatureFlag } from '@sd/client';
import {
isEnabled,
useBridgeMutation,
useBridgeQuery,
useConnectedPeers,
useDiscoveredPeers,
useFeatureFlag
} from '@sd/client';
import { Button } from '@sd/ui';
import { startPairing } from '~/app/p2p/pairing';
import { Heading } from '../Layout';
@ -23,34 +30,69 @@ export const Component = () => {
// TODO: This entire component shows a UI which is pairing by node but that is just not how it works.
function IncorrectP2PPairingPane() {
const onlineNodes = useDiscoveredPeers();
const connectedNodes = useConnectedPeers();
const p2pPair = useBridgeMutation('p2p.pair', {
onSuccess(data) {
console.log(data);
}
});
const nlmState = useBridgeQuery(['p2p.nlmState'], {
refetchInterval: 1000
});
const libraries = useBridgeQuery(['library.list']);
return (
<>
<h1>Pairing</h1>
{[...onlineNodes.entries()].map(([id, node]) => (
<div key={id} className="flex space-x-2">
<p>{node.name}</p>
<div className="flex-space-4 flex w-full">
<div className="flex-[50%]">
<h1>Pairing</h1>
{[...onlineNodes.entries()].map(([id, node]) => (
<div key={id} className="flex space-x-2">
<p>{node.name}</p>
<Button
onClick={() => {
// TODO: This is not great
p2pPair.mutateAsync(id).then((id) =>
startPairing(id, {
name: node.name,
os: node.operating_system
})
);
}}
>
Pair
</Button>
<Button
onClick={() => {
// TODO: This is not great
p2pPair.mutateAsync(id).then((id) =>
startPairing(id, {
name: node.name,
os: node.operating_system
})
);
}}
>
Pair
</Button>
</div>
))}
</div>
))}
<div className="flex-[50%]">
<h1 className="mt-4">Connected</h1>
{[...connectedNodes.entries()].map(([id, node]) => (
<div key={id} className="flex space-x-2">
<p>{id}</p>
</div>
))}
</div>
</div>
<div>
<p>NLM State:</p>
<pre className="pl-5">{JSON.stringify(nlmState.data || {}, undefined, 2)}</pre>
</div>
<div>
<p>Libraries:</p>
{libraries.data?.map((v) => (
<div key={v.uuid} className="pb-2">
<p>
{v.config.name} - {v.uuid}
</p>
<div className="pl-5">
<p>Instance: {`${v.config.instance_id}/${v.instance_id}`}</p>
<p>Instance PK: {`${v.instance_public_key}`}</p>
</div>
</div>
))}
</div>
</>
);
}

View file

@ -24,6 +24,7 @@ export type Procedures = {
{ key: "notifications.dismiss", input: NotificationId, result: null } |
{ key: "notifications.dismissAll", input: never, result: null } |
{ key: "notifications.get", input: never, result: Notification[] } |
{ key: "p2p.nlmState", input: never, result: { [key: string]: LibraryData } } |
{ key: "preferences.get", input: LibraryArgs<null>, result: LibraryPreferences } |
{ key: "search.ephemeralPaths", input: LibraryArgs<NonIndexedPath>, result: NonIndexedFileSystemEntries } |
{ key: "search.objects", input: LibraryArgs<ObjectSearchArgs>, result: SearchData<ExplorerItem> } |
@ -178,6 +179,8 @@ export type IndexerRule = { id: number; pub_id: number[]; name: string | null; d
*/
export type IndexerRuleCreateArgs = { name: string; dry_run: boolean; rules: ([RuleKind, string[]])[] }
export type InstanceState = "Unavailable" | { Discovered: PeerId } | { Connected: PeerId }
export type InvalidateOperationEvent = { key: string; arg: any; result: any | null }
export type JobGroup = { id: string; action: string | null; status: JobStatus; created_at: string; jobs: JobReport[] }
@ -198,7 +201,9 @@ export type LibraryArgs<T> = { library_id: string; arg: T }
*/
export type LibraryConfig = { name: LibraryName; description: string | null; instance_id: number }
export type LibraryConfigWrapped = { uuid: string; config: LibraryConfig }
export type LibraryConfigWrapped = { uuid: string; instance_id: string; instance_public_key: string; config: LibraryConfig }
export type LibraryData = { instances: { [key: string]: InstanceState } }
export type LibraryName = string
@ -285,7 +290,7 @@ export type OptionalRange<T> = { from: T | null; to: T | null }
/**
* TODO: P2P event for the frontend
*/
export type P2PEvent = { type: "DiscoveredPeer"; peer_id: PeerId; metadata: PeerMetadata } | { type: "SpacedropRequest"; id: string; peer_id: PeerId; name: string } | { type: "PairingRequest"; id: number; name: string; os: OperatingSystem } | { type: "PairingProgress"; id: number; status: PairingStatus }
export type P2PEvent = { type: "DiscoveredPeer"; peer_id: PeerId; metadata: PeerMetadata } | { type: "ExpiredPeer"; peer_id: PeerId } | { type: "ConnectedPeer"; peer_id: PeerId } | { type: "DisconnectedPeer"; peer_id: PeerId } | { type: "SpacedropRequest"; id: string; peer_id: PeerId; name: string } | { type: "PairingRequest"; id: number; name: string; os: OperatingSystem } | { type: "PairingProgress"; id: number; status: PairingStatus }
export type PairingDecision = { decision: "accept"; libraryId: string } | { decision: "reject" }

View file

@ -12,6 +12,7 @@ import { useBridgeSubscription } from '../rspc';
type Context = {
discoveredPeers: Map<string, PeerMetadata>;
connectedPeers: Map<string, undefined>;
pairingStatus: Map<number, PairingStatus>;
events: MutableRefObject<EventTarget>;
};
@ -21,6 +22,7 @@ const Context = createContext<Context>(null as any);
export function P2PContextProvider({ children }: PropsWithChildren) {
const events = useRef(new EventTarget());
const [[discoveredPeers], setDiscoveredPeer] = useState([new Map<string, PeerMetadata>()]);
const [[connectedPeers], setConnectedPeers] = useState([new Map<string, undefined>()]);
const [[pairingStatus], setPairingStatus] = useState([new Map<number, PairingStatus>()]);
useBridgeSubscription(['p2p.events'], {
@ -28,7 +30,17 @@ export function P2PContextProvider({ children }: PropsWithChildren) {
events.current.dispatchEvent(new CustomEvent('p2p-event', { detail: data }));
if (data.type === 'DiscoveredPeer') {
setDiscoveredPeer([discoveredPeers.set(data.peer_id, data.metadata)]);
discoveredPeers.set(data.peer_id, data.metadata);
setDiscoveredPeer([discoveredPeers]);
} else if (data.type === 'ExpiredPeer') {
discoveredPeers.delete(data.peer_id);
setDiscoveredPeer([discoveredPeers]);
} else if (data.type === 'ConnectedPeer') {
connectedPeers.set(data.peer_id, undefined);
setConnectedPeers([connectedPeers]);
} else if (data.type === 'DisconnectedPeer') {
connectedPeers.delete(data.peer_id);
setConnectedPeers([connectedPeers]);
} else if (data.type === 'PairingProgress') {
setPairingStatus([pairingStatus.set(data.id, data.status)]);
}
@ -39,6 +51,7 @@ export function P2PContextProvider({ children }: PropsWithChildren) {
<Context.Provider
value={{
discoveredPeers,
connectedPeers,
pairingStatus,
events
}}
@ -52,6 +65,10 @@ export function useDiscoveredPeers() {
return useContext(Context).discoveredPeers;
}
export function useConnectedPeers() {
return useContext(Context).connectedPeers;
}
export function usePairingStatus(pairing_id: number) {
return useContext(Context).pairingStatus.get(pairing_id);
}