P2P holepunching + relay (#2147)

* Basic AF server

* wip

* Add autonat to relay server

* Add autonat client + fixes

* Deploy script

* wip

* Debug view

* wip

* wip

* relay all events

* wip

* fix

* wip

* libp2p man spoke

* dctur

* Relay config file

* Advertise relay server

* Dynamic relay configuration

* wip

* p2p relay config

* cleanup

* push instances into p2p state

* fix

* Fix up TS
This commit is contained in:
Oscar Beaumont 2024-03-12 14:18:58 +08:00 committed by GitHub
parent d1fa6af7be
commit 926ae4fc65
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 1635 additions and 879 deletions

1507
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,18 +1,17 @@
[workspace]
resolver = "2"
members = [
"core",
"core/crates/*",
"crates/*",
# "crates/p2p/tunnel",
# "crates/p2p/tunnel/utils",
"apps/cli",
"apps/desktop/src-tauri",
"apps/desktop/crates/*",
"apps/mobile/modules/sd-core/core",
"apps/mobile/modules/sd-core/android/crate",
"apps/mobile/modules/sd-core/ios/crate",
"apps/server",
"core",
"core/crates/*",
"crates/*",
"apps/cli",
"apps/p2p-relay",
"apps/desktop/src-tauri",
"apps/desktop/crates/*",
"apps/mobile/modules/sd-core/core",
"apps/mobile/modules/sd-core/android/crate",
"apps/mobile/modules/sd-core/ios/crate",
"apps/server",
]
[workspace.package]
@ -23,18 +22,18 @@ repository = "https://github.com/spacedriveapp/spacedrive"
[workspace.dependencies]
# First party dependencies
prisma-client-rust = { git = "https://github.com/spacedriveapp/prisma-client-rust", rev = "f99d6f5566570f3ab1edecb7a172ad25b03d95af", features = [
"sqlite-create-many",
"migrations",
"sqlite",
"sqlite-create-many",
"migrations",
"sqlite",
], default-features = false }
prisma-client-rust-cli = { git = "https://github.com/spacedriveapp/prisma-client-rust", rev = "f99d6f5566570f3ab1edecb7a172ad25b03d95af", features = [
"specta",
"sqlite-create-many",
"migrations",
"sqlite",
"specta",
"sqlite-create-many",
"migrations",
"sqlite",
], default-features = false }
prisma-client-rust-sdk = { git = "https://github.com/spacedriveapp/prisma-client-rust", rev = "f99d6f5566570f3ab1edecb7a172ad25b03d95af", features = [
"sqlite",
"sqlite",
], default-features = false }
tracing = "0.1.40"

24
apps/p2p-relay/Cargo.toml Normal file
View file

@ -0,0 +1,24 @@
[package]
name = "sd-p2p-relay"
version = "0.0.1"
publish = false
license.workspace = true
edition.workspace = true
repository.workspace = true
[dependencies]
hex.workspace = true
libp2p = { version = "0.53.2", features = [
"tokio",
"quic",
"relay",
"autonat",
"macros",
] }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tracing.workspace = true
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
uuid = { workspace = true, features = ["serde", "v4"] }

12
apps/p2p-relay/deploy.sh Executable file
View file

@ -0,0 +1,12 @@
#!/bin/bash
# A temporary script to deploy the p2p-relay to the server for testing
set -e
SERVER="54.176.132.155"
TARGET_DIR=$(cargo metadata | jq -r .target_directory)
cargo zigbuild --target aarch64-unknown-linux-musl --release
echo "$TARGET_DIR/aarch64-unknown-linux-musl/release/sd-p2p-relay"
scp "$TARGET_DIR/aarch64-unknown-linux-musl/release/sd-p2p-relay" ec2-user@$SERVER:/home/ec2-user/sd-p2p-relay

View file

@ -0,0 +1,75 @@
use std::{borrow::Cow, path::Path};
use libp2p::identity::Keypair;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
// Unique ID of this relay server.
pub id: Uuid,
// URL of the cloud API.
#[serde(skip_serializing_if = "Option::is_none")]
api_url: Option<String>,
// Secret used for authenticating with cloud backend.
pub p2p_secret: String,
// Port to listen on.
#[serde(skip_serializing_if = "Option::is_none")]
port: Option<u16>,
// Private/public keypair to use for the relay.
#[serde(with = "keypair")]
pub keypair: Keypair,
}
impl Config {
pub fn init(
path: impl AsRef<Path>,
p2p_secret: String,
) -> Result<Self, Box<dyn std::error::Error>> {
let config = Self {
id: Uuid::new_v4(),
api_url: None,
p2p_secret,
port: None,
keypair: Keypair::generate_ed25519(),
};
std::fs::write(path, serde_json::to_string_pretty(&config)?)?;
Ok(config)
}
pub fn load(path: impl AsRef<Path>) -> Result<Self, Box<dyn std::error::Error>> {
let config = std::fs::read_to_string(path)?;
Ok(serde_json::from_str(&config)?)
}
pub fn api_url(&self) -> Cow<'_, str> {
match self.api_url {
Some(ref url) => Cow::Borrowed(url),
None => Cow::Borrowed("https://app.spacedrive.com"),
}
}
pub fn port(&self) -> u16 {
self.port.unwrap_or(7373) // TODO: Should we use HTTPS port to avoid strict internet filters???
}
}
mod keypair {
use libp2p::identity::Keypair;
use serde::{de::Error, Deserialize, Deserializer, Serializer};
pub fn deserialize<'de, D>(deserializer: D) -> Result<Keypair, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
let bytes = hex::decode(s).map_err(D::Error::custom)?;
Keypair::from_protobuf_encoding(bytes.as_slice()).map_err(D::Error::custom)
}
pub fn serialize<S: Serializer>(v: &Keypair, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_str(&hex::encode(
v.to_protobuf_encoding().expect("invalid keypair type"),
))
}
}

217
apps/p2p-relay/src/main.rs Normal file
View file

@ -0,0 +1,217 @@
use std::{
io::{stdin, stdout, Write},
net::{Ipv4Addr, Ipv6Addr, SocketAddr},
path::PathBuf,
};
use libp2p::{
autonat,
futures::StreamExt,
relay,
swarm::{NetworkBehaviour, SwarmEvent},
};
use reqwest::header::{self, HeaderMap, HeaderValue};
use serde::{Deserialize, Serialize};
use tracing::{error, info, warn};
use uuid::Uuid;
use crate::utils::socketaddr_to_quic_multiaddr;
mod config;
mod utils;
// TODO: Authentication with the Spacedrive Cloud
// TODO: Rate-limit data usage by Spacedrive account.
// TODO: Expose libp2p metrics like - https://github.com/mxinden/rust-libp2p-server/blob/master/src/behaviour.rs
#[derive(NetworkBehaviour)]
pub struct Behaviour {
relay: relay::Behaviour,
autonat: autonat::Behaviour,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RelayServerEntry {
id: Uuid,
// TODO: Try and drop this field cause it's libp2p specific
peer_id: String,
addrs: Vec<SocketAddr>,
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
// .with_env_filter(EnvFilter::from_default_env()) // TODO: ???
.init();
let config_path =
PathBuf::from(std::env::var("CONFIG_PATH").unwrap_or("./config.json".to_string()));
let mut args = std::env::args();
args.next(); // Skip binary name
if args.next().as_deref() == Some("init") {
println!("Initializing config at '{config_path:?}'...");
if config_path.exists() {
panic!("Config already exists at path '{config_path:?}'. Please delete it first!");
// TODO: Error handling
}
print!("Please enter the p2p secret: ");
let mut p2p_secret = String::new();
let _ = stdout().flush();
stdin()
.read_line(&mut p2p_secret)
.expect("Did not enter a correct string");
config::Config::init(&config_path, p2p_secret.replace('\n', "")).unwrap(); // TODO: Error handling
println!("\nSuccessfully initialized config at '{config_path:?}'!");
return;
}
if !config_path.exists() {
panic!("Unable to find config at path '{config_path:?}'. Please create it!"); // TODO: Error handling
}
let config = config::Config::load(&config_path).unwrap(); // TODO: Error handling
info!("Starting...");
let public_ipv4: Ipv4Addr = reqwest::get("https://api.ipify.org")
.await
.unwrap() // TODO: Error handling
.text()
.await
.unwrap() // TODO: Error handling
.parse()
.unwrap(); // TODO: Error handling
let public_ipv6: Option<Ipv6Addr> = match reqwest::get("https://api6.ipify.org").await {
Ok(v) => Some(
v.text()
.await
.unwrap() // TODO: Error handling
.parse()
.unwrap(), // TODO: Error handling
),
Err(_) => {
warn!("Error getting public IPv6 address. Skipping IPv6 configuration.");
None
}
};
info!("Determined public addresses of the current relay to be: '{public_ipv4}' and '{public_ipv6:?}'");
let (first_advertisement_tx, mut first_advertisement_rx) = tokio::sync::mpsc::channel(1);
tokio::spawn({
let config = config.clone();
async move {
let client = reqwest::Client::new();
let mut first_advertisement_tx = Some(first_advertisement_tx);
loop {
let result = client
.post(format!("{}/api/p2p/relays", config.api_url()))
.headers({
let mut map = HeaderMap::new();
map.insert(
header::AUTHORIZATION,
HeaderValue::from_str(&format!("Bearer {}", config.p2p_secret))
.unwrap(),
);
map
})
.json(&RelayServerEntry {
id: config.id,
peer_id: config.keypair.public().to_peer_id().to_base58(),
addrs: {
let mut ips: Vec<SocketAddr> =
vec![SocketAddr::from((public_ipv4, config.port()))];
if let Some(ip) = public_ipv6 {
ips.push(SocketAddr::from((ip, config.port())));
}
ips
},
})
.send()
.await;
let mut is_ok = result.is_ok();
match result {
Ok(result) => {
if result.status() != 200 {
error!(
"Failed to register relay server with cloud status {}: {:?}",
result.status(),
result.text().await
);
is_ok = false;
} else {
info!(
"Successfully registered '{}' as relay server with cloud",
config.id
);
}
}
Err(e) => error!("Failed to register relay server with cloud: {e}"),
}
if let Some(tx) = first_advertisement_tx.take() {
tx.send(is_ok).await.ok();
}
tokio::time::sleep(std::time::Duration::from_secs(9 * 60)).await;
}
}
});
if !first_advertisement_rx
.recv()
.await
.expect("Advertisement task died during startup!")
{
panic!(
"Failed to register relay server with cloud. Please check your config and try again."
); // TODO: Error handling
}
// TODO: Setup logging to filesystem with auto-rotation
let peer_id = config.keypair.public().to_peer_id();
let mut swarm = libp2p::SwarmBuilder::with_existing_identity(config.keypair.clone())
.with_tokio()
.with_quic()
.with_behaviour(|key| Behaviour {
relay: relay::Behaviour::new(key.public().to_peer_id(), Default::default()), // TODO: Proper config
autonat: autonat::Behaviour::new(key.public().to_peer_id(), Default::default()), // TODO: Proper config
})
.unwrap() // TODO: Error handling
.build();
swarm
.listen_on(socketaddr_to_quic_multiaddr(&SocketAddr::from((
Ipv6Addr::UNSPECIFIED,
config.port(),
))))
.unwrap(); // TODO: Error handling
swarm
.listen_on(socketaddr_to_quic_multiaddr(&SocketAddr::from((
Ipv4Addr::UNSPECIFIED,
config.port(),
))))
.unwrap(); // TODO: Error handling
info!("Started Relay as PeerId '{peer_id}'");
loop {
match swarm.next().await.expect("Infinite Stream.") {
// SwarmEvent::Behaviour(event) => {
// println!("{event:?}")
// }
SwarmEvent::NewListenAddr { address, .. } => {
info!("Listening on {address:?}");
}
event => println!("{event:?}"),
}
}
}

View file

@ -0,0 +1,17 @@
//! This file contains some fairly meaningless glue code for integrating with libp2p.
use std::net::SocketAddr;
use libp2p::{multiaddr::Protocol, Multiaddr};
#[must_use]
pub(crate) fn socketaddr_to_quic_multiaddr(m: &SocketAddr) -> Multiaddr {
let mut addr = Multiaddr::empty();
match m {
SocketAddr::V4(ip) => addr.push(Protocol::Ip4(*ip.ip())),
SocketAddr::V6(ip) => addr.push(Protocol::Ip6(*ip.ip())),
}
addr.push(Protocol::Udp(m.port()));
addr.push(Protocol::QuicV1);
addr
}

View file

@ -25,15 +25,15 @@ sd-core-sync = { path = "./crates/sync" }
# sd-cloud-api = { path = "../crates/cloud-api" }
sd-file-path-helper = { path = "../crates/file-path-helper" }
sd-crypto = { path = "../crates/crypto", features = [
"sys",
"tokio",
"sys",
"tokio",
], optional = true }
sd-ffmpeg = { path = "../crates/ffmpeg", optional = true }
sd-file-ext = { path = "../crates/file-ext" }
sd-images = { path = "../crates/images", features = [
"rspc",
"serde",
"specta",
"rspc",
"serde",
"specta",
] }
sd-media-metadata = { path = "../crates/media-metadata" }
sd-p2p2 = { path = "../crates/p2p2", features = ["specta"] }
@ -64,12 +64,12 @@ regex = { workspace = true }
reqwest = { workspace = true, features = ["json", "native-tls-vendored"] }
rmp-serde = { workspace = true }
rspc = { workspace = true, features = [
"axum",
"uuid",
"chrono",
"tracing",
"alpha",
"unstable",
"axum",
"uuid",
"chrono",
"tracing",
"alpha",
"unstable",
] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
@ -79,12 +79,12 @@ strum_macros = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = [
"sync",
"rt-multi-thread",
"io-util",
"macros",
"time",
"process",
"sync",
"rt-multi-thread",
"io-util",
"macros",
"time",
"process",
] }
tokio-stream = { workspace = true, features = ["fs"] }
tokio-util = { workspace = true, features = ["io"] }
@ -111,7 +111,7 @@ itertools = "0.12.0"
libc = "0.2.153"
mini-moka = "0.10.2"
notify = { git = "https://github.com/notify-rs/notify.git", rev = "c3929ed114fbb0bc7457a9a498260461596b00ca", default-features = false, features = [
"macos_fsevent",
"macos_fsevent",
] }
rmpv = { workspace = true }
serde-hashkey = "0.4.5"
@ -144,10 +144,10 @@ plist = "1"
[target.'cfg(target_os = "ios")'.dependencies]
icrate = { version = "0.1.0", features = [
"Foundation",
"Foundation_NSFileManager",
"Foundation_NSString",
"Foundation_NSNumber",
"Foundation",
"Foundation_NSFileManager",
"Foundation_NSString",
"Foundation_NSNumber",
] }
[dev-dependencies]

View file

@ -52,7 +52,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
let mut stream = peer
.ok_or(rspc::Error::new(
ErrorCode::InternalServerError,
"big man, offline".into(),
"big man, not found".into(),
))?
.new_stream()
.await

View file

@ -1,8 +1,6 @@
use sd_sync::*;
use serde::{Deserialize, Serialize};
use std::sync::{atomic, Arc};
use tokio::sync::Notify;
use uuid::Uuid;
use crate::{library::Library, Node};

View file

@ -1,38 +1,103 @@
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use sd_p2p2::P2P;
use sd_p2p2::{flume::bounded, HookEvent, IdentityOrRemoteIdentity, PeerConnectionCandidate, P2P};
use tracing::error;
use crate::library::{Libraries, LibraryManagerEvent};
pub fn start(p2p: Arc<P2P>, libraries: Arc<Libraries>) {
tokio::spawn(async move {
if let Err(err) = libraries
.rx
.clone()
.subscribe(|msg| {
let p2p = p2p.clone();
async move {
match msg {
LibraryManagerEvent::InstancesModified(library)
| LibraryManagerEvent::Load(library) => {
p2p.metadata_mut().insert(
library.id.to_string(),
library.identity.to_remote_identity().to_string(),
);
}
LibraryManagerEvent::Edit(_library) => {
// TODO: Send changes to all connected nodes or queue sending for when they are online!
}
LibraryManagerEvent::Delete(library) => {
p2p.metadata_mut().remove(&library.id.to_string());
/// A P2P hook which integrates P2P into Spacedrive's library system.
///
/// This hooks is responsible for:
/// - injecting library peers into the P2P system so we can connect to them over internet.
///
pub struct LibrariesHook {}
impl LibrariesHook {
pub fn spawn(p2p: Arc<P2P>, libraries: Arc<Libraries>) -> Self {
let (tx, rx) = bounded(15);
let hook_id = p2p.register_hook("sd-libraries-hook", tx);
let handle = tokio::spawn(async move {
if let Err(err) = libraries
.rx
.clone()
.subscribe(|msg| {
let p2p = p2p.clone();
async move {
match msg {
LibraryManagerEvent::InstancesModified(library)
| LibraryManagerEvent::Load(library) => {
p2p.metadata_mut().insert(
library.id.to_string(),
library.identity.to_remote_identity().to_string(),
);
let Ok(instances) =
library.db.instance().find_many(vec![]).exec().await
else {
return;
};
for i in instances.iter() {
let identity =
IdentityOrRemoteIdentity::from_bytes(&i.identity)
.expect("lol: invalid DB entry")
.remote_identity();
p2p.clone().discover_peer(
hook_id,
identity,
HashMap::new(), // TODO: We should probs cache this so we have something
[PeerConnectionCandidate::Relay].into_iter().collect(),
);
}
}
LibraryManagerEvent::Edit(_library) => {
// TODO: Send changes to all connected nodes or queue sending for when they are online!
}
LibraryManagerEvent::Delete(library) => {
p2p.metadata_mut().remove(&library.id.to_string());
let Ok(instances) =
library.db.instance().find_many(vec![]).exec().await
else {
return;
};
for i in instances.iter() {
let identity =
IdentityOrRemoteIdentity::from_bytes(&i.identity)
.expect("lol: invalid DB entry")
.remote_identity();
let peers = p2p.peers();
let Some(peer) = peers.get(&identity) else {
continue;
};
peer.undiscover_peer(hook_id);
}
}
}
}
})
.await
{
error!("Core may become unstable! `LibraryServices::start` manager aborted with error: {err:?}");
}
});
tokio::spawn(async move {
while let Ok(event) = rx.recv_async().await {
match event {
HookEvent::Shutdown { _guard } => {
handle.abort();
break;
}
_ => continue,
}
})
.await
{
error!("Core may become unstable! `LibraryServices::start` manager aborted with error: {err:?}");
}
});
}
});
Self {}
}
}

View file

@ -3,7 +3,7 @@ use crate::{
config::{self, P2PDiscoveryState, Port},
get_hardware_model_name, HardwareModel,
},
p2p::{libraries, operations, sync::SyncMessage, Header, OperatingSystem, SPACEDRIVE_APP_ID},
p2p::{operations, sync::SyncMessage, Header, OperatingSystem, SPACEDRIVE_APP_ID},
Node,
};
@ -11,7 +11,8 @@ use axum::routing::IntoMakeService;
use sd_p2p2::{
flume::{bounded, Receiver},
Libp2pPeerId, Listener, Mdns, Peer, QuicTransport, RemoteIdentity, UnicastStream, P2P,
Libp2pPeerId, Listener, Mdns, Peer, QuicTransport, RelayServerEntry, RemoteIdentity,
UnicastStream, P2P,
};
use sd_p2p_tunnel::Tunnel;
use serde::Serialize;
@ -22,6 +23,7 @@ use std::{
convert::Infallible,
net::SocketAddr,
sync::{atomic::AtomicBool, Arc, Mutex, PoisonError},
time::Duration,
};
use tower_service::Service;
use tracing::error;
@ -30,16 +32,16 @@ use tokio::sync::oneshot;
use tracing::info;
use uuid::Uuid;
use super::{P2PEvents, PeerMetadata};
use super::{libraries::LibrariesHook, P2PEvents, PeerMetadata};
pub struct P2PManager {
pub(crate) p2p: Arc<P2P>,
mdns: Mutex<Option<Mdns>>,
quic: QuicTransport,
// TODO: Make private
pub quic: QuicTransport,
// The `libp2p::PeerId`. This is for debugging only, use `RemoteIdentity` instead.
lp2p_peer_id: Libp2pPeerId,
pub(crate) events: P2PEvents,
// connect_hook: ConnectHook,
pub(super) spacedrop_pairing_reqs: Arc<Mutex<HashMap<Uuid, oneshot::Sender<Option<String>>>>>,
pub(super) spacedrop_cancellations: Arc<Mutex<HashMap<Uuid, Arc<AtomicBool>>>>,
pub(crate) node_config: Arc<config::Manager>,
@ -65,14 +67,13 @@ impl P2PManager {
mdns: Mutex::new(None),
quic,
events: P2PEvents::spawn(p2p.clone()),
// connect_hook: ConnectHook::spawn(p2p),
spacedrop_pairing_reqs: Default::default(),
spacedrop_cancellations: Default::default(),
node_config,
});
this.on_node_config_change().await;
libraries::start(this.p2p.clone(), libraries);
LibrariesHook::spawn(this.p2p.clone(), libraries);
info!(
"Node RemoteIdentity('{}') libp2p::PeerId('{:?}') is now online listening at addresses: {:?}",
@ -81,8 +82,43 @@ impl P2PManager {
this.p2p.listeners()
);
Ok((this.clone(), |node, router| {
tokio::spawn(start(this, node, rx, router));
Ok((this.clone(), |node: Arc<Node>, router| {
tokio::spawn(start(this.clone(), node.clone(), rx, router));
// TODO: Cleanup this thread on p2p shutdown.
tokio::spawn(async move {
let client = reqwest::Client::new();
loop {
match client
.get(format!("{}/api/p2p/relays", node.env.api_url.lock().await))
.send()
.await
{
Ok(resp) => {
if resp.status() != 200 {
error!(
"Failed to pull p2p relay configuration: {} {:?}",
resp.status(),
resp.text().await
);
} else {
match resp.json::<Vec<RelayServerEntry>>().await {
Ok(config) => {
this.quic.relay_config(config).await;
info!("Updated p2p relay configuration successfully.")
}
Err(err) => {
error!("Failed to parse p2p relay configuration: {err:?}")
}
}
}
}
Err(err) => error!("Error pulling p2p relay configuration: {err:?}"),
}
tokio::time::sleep(Duration::from_secs(11 * 60)).await;
}
});
}))
}

View file

@ -7,7 +7,7 @@ edition.workspace = true
repository.workspace = true
[dependencies]
ed25519-dalek = "2.1.0"
ed25519-dalek = "2.1.1"
thiserror.workspace = true
tokio = { workspace = true, features = ["io-util"] }
uuid.workspace = true

View file

@ -30,16 +30,25 @@ tokio = { workspace = true, features = [
tokio-stream = { workspace = true, features = ["sync"] }
tokio-util = { workspace = true, features = ["compat"] }
tracing = { workspace = true }
uuid = { workspace = true }
uuid = { workspace = true, features = ["serde"] }
ed25519-dalek = { version = "2.1.0", features = [] }
ed25519-dalek = { version = "2.1.1", features = [] }
flume = "=0.11.0" # Must match version used by `mdns-sd`
futures-core = "0.3.30"
if-watch = { version = "=3.2.0", features = [
"tokio",
] } # Override the features of if-watch which is used by libp2p-quic
libp2p = { version = "0.53.2", features = ["tokio", "serde"] }
libp2p-quic = { version = "0.10.2", features = ["tokio"] }
libp2p = { version = "0.53.2", features = [
"tokio",
"serde",
"macros",
"quic",
"autonat",
"relay",
"yamux",
"noise",
"dcutr",
] }
libp2p-stream = "0.1.0-alpha"
mdns-sd = "0.10.3"
rand_core = { version = "0.6.4" }
@ -50,6 +59,7 @@ sha256 = "1.5.0"
stable-vec = "0.4.0"
hash_map_diff = "0.2.0"
sync_wrapper = "0.1.2"
reqwest.workspace = true
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }

View file

@ -1,9 +1,14 @@
use std::{collections::HashSet, fmt, net::SocketAddr, sync::Arc};
use std::{
collections::{BTreeSet, HashSet},
fmt,
net::SocketAddr,
sync::Arc,
};
use flume::Sender;
use tokio::sync::oneshot;
use crate::{Peer, RemoteIdentity};
use crate::{Peer, PeerConnectionCandidate, RemoteIdentity};
#[derive(Debug, Clone)]
pub enum HookEvent {
@ -100,7 +105,12 @@ impl Hook {
let _ = self.tx.send(event);
}
pub fn acceptor(&self, id: ListenerId, peer: &Arc<Peer>, addrs: &HashSet<SocketAddr>) {
pub fn acceptor(
&self,
id: ListenerId,
peer: &Arc<Peer>,
addrs: &BTreeSet<PeerConnectionCandidate>,
) {
if let Some(listener) = &self.listener {
(listener.acceptor.0)(id, peer, addrs);
}
@ -115,8 +125,9 @@ pub(crate) struct ListenerData {
/// This is a function over a channel because we need to ensure the code runs prior to the peer being emitted to the application.
/// If not the peer would have no registered way to connect to it initially which would be confusing.
#[allow(clippy::type_complexity)]
pub acceptor:
HandlerFn<Arc<dyn Fn(ListenerId, &Arc<Peer>, &HashSet<SocketAddr>) + Send + Sync>>,
pub acceptor: HandlerFn<
Arc<dyn Fn(ListenerId, &Arc<Peer>, &BTreeSet<PeerConnectionCandidate>) + Send + Sync>,
>,
}
/// A little wrapper for functions to make them `Debug`.

View file

@ -16,8 +16,8 @@ pub use identity::{
};
pub use mdns::Mdns;
pub use p2p::{Listener, P2P};
pub use peer::{ConnectionRequest, Peer};
pub use quic::{Libp2pPeerId, QuicTransport};
pub use peer::{ConnectionRequest, Peer, PeerConnectionCandidate};
pub use quic::{Libp2pPeerId, QuicTransport, RelayServerEntry};
pub use smart_guards::SmartWriteGuard;
pub use stream::UnicastStream;

View file

@ -7,7 +7,7 @@ use mdns_sd::{ServiceDaemon, ServiceEvent, ServiceInfo};
use tokio::time::{sleep_until, Instant, Sleep};
use tracing::{error, trace, warn};
use crate::{HookEvent, HookId, RemoteIdentity, ShutdownGuard, P2P};
use crate::{HookEvent, HookId, PeerConnectionCandidate, RemoteIdentity, ShutdownGuard, P2P};
/// The time between re-advertising the mDNS service.
const MDNS_READVERTISEMENT_INTERVAL: Duration = Duration::from_secs(60); // Every minute re-advertise
@ -134,7 +134,9 @@ fn on_event(state: &State, event: ServiceEvent) {
.collect(),
info.get_addresses()
.iter()
.map(|addr| SocketAddr::new(*addr, info.get_port()))
.map(|addr| {
PeerConnectionCandidate::SocketAddr(SocketAddr::new(*addr, info.get_port()))
})
.collect(),
);
}

View file

@ -1,5 +1,5 @@
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
collections::{hash_map::Entry, BTreeSet, HashMap, HashSet},
net::SocketAddr,
sync::{Arc, PoisonError, RwLock, RwLockReadGuard},
time::Duration,
@ -15,7 +15,7 @@ use tracing::info;
use crate::{
hooks::{HandlerFn, Hook, HookEvent, ListenerData, ListenerId, ShutdownGuard},
smart_guards::SmartWriteGuard,
HookId, Identity, Peer, RemoteIdentity, UnicastStream,
HookId, Identity, Peer, PeerConnectionCandidate, RemoteIdentity, UnicastStream,
};
/// Manager for the entire P2P system.
@ -122,7 +122,7 @@ impl P2P {
hook_id: HookId,
identity: RemoteIdentity,
metadata: HashMap<String, String>,
addrs: HashSet<SocketAddr>,
addrs: BTreeSet<PeerConnectionCandidate>,
) -> Arc<Peer> {
let mut peers = self.peers.write().unwrap_or_else(PoisonError::into_inner);
let peer = peers.entry(identity);
@ -135,7 +135,8 @@ impl P2P {
.clone();
{
let mut state = peer.state.write().unwrap_or_else(PoisonError::into_inner);
let mut state: std::sync::RwLockWriteGuard<'_, crate::peer::State> =
peer.state.write().unwrap_or_else(PoisonError::into_inner);
state.discovered.insert(hook_id, addrs.clone());
}
@ -231,7 +232,10 @@ impl P2P {
&self,
name: &'static str,
tx: Sender<HookEvent>,
acceptor: impl Fn(ListenerId, &Arc<Peer>, &HashSet<SocketAddr>) + Send + Sync + 'static,
acceptor: impl Fn(ListenerId, &Arc<Peer>, &BTreeSet<PeerConnectionCandidate>)
+ Send
+ Sync
+ 'static,
) -> ListenerId {
let mut hooks = self.hooks.write().unwrap_or_else(PoisonError::into_inner);
let hook_id = hooks.push(Hook {

View file

@ -1,5 +1,5 @@
use std::{
collections::{HashMap, HashSet},
collections::{BTreeSet, HashMap, HashSet},
net::SocketAddr,
sync::{Arc, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak},
};
@ -23,6 +23,15 @@ pub struct Peer {
pub(crate) p2p: Weak<P2P>,
}
// The order of this enum is the preference of the connection type.
#[derive(Debug, Clone, Hash, Eq, PartialEq, PartialOrd, Ord)]
pub enum PeerConnectionCandidate {
SocketAddr(SocketAddr),
Relay,
// Custom(String),
}
#[derive(Debug, Default)]
pub(crate) struct State {
/// Active connections with the remote
@ -31,7 +40,7 @@ pub(crate) struct State {
/// These should be inject by `Listener::acceptor` which is called when a new peer is discovered.
pub(crate) connection_methods: HashMap<ListenerId, mpsc::Sender<ConnectionRequest>>,
/// Methods that have discovered this peer.
pub(crate) discovered: HashMap<HookId, HashSet<SocketAddr>>,
pub(crate) discovered: HashMap<HookId, BTreeSet<PeerConnectionCandidate>>,
}
/// A request to connect to a client.
@ -40,7 +49,7 @@ pub(crate) struct State {
#[non_exhaustive]
pub struct ConnectionRequest {
pub to: RemoteIdentity,
pub addrs: HashSet<SocketAddr>,
pub addrs: BTreeSet<PeerConnectionCandidate>,
pub tx: oneshot::Sender<Result<UnicastStream, String>>,
}
@ -62,7 +71,8 @@ impl PartialEq for Peer {
// Internal methods
impl Peer {
pub(crate) fn new(identity: RemoteIdentity, p2p: Arc<P2P>) -> Arc<Self> {
// TODO: Make this private
pub fn new(identity: RemoteIdentity, p2p: Arc<P2P>) -> Arc<Self> {
Arc::new(Self {
identity,
metadata: Default::default(),
@ -144,7 +154,7 @@ impl Peer {
.values()
.flatten()
.cloned()
.collect::<HashSet<_>>();
.collect::<BTreeSet<_>>();
let Some((_id, connect_tx)) = state
.connection_methods
@ -184,7 +194,7 @@ impl Peer {
// Hook-facing methods
impl Peer {
pub fn hook_discovered(&self, hook: HookId, addrs: HashSet<SocketAddr>) {
pub fn hook_discovered(&self, hook: HookId, addrs: BTreeSet<PeerConnectionCandidate>) {
// TODO: Emit event maybe???
self.state

View file

@ -1,4 +1,4 @@
pub(super) mod transport;
pub(super) mod utils;
pub use transport::{Libp2pPeerId, QuicTransport};
pub use transport::{Libp2pPeerId, QuicTransport, RelayServerEntry};

View file

@ -1,33 +1,37 @@
use std::{
collections::{HashMap, HashSet},
convert::Infallible,
collections::HashMap,
net::{Ipv4Addr, Ipv6Addr, SocketAddr},
str::FromStr,
sync::{Arc, PoisonError, RwLock},
time::Duration,
};
use flume::{bounded, Receiver, Sender};
use libp2p::{
core::muxing::StreamMuxerBox,
autonat, dcutr,
futures::{AsyncReadExt, AsyncWriteExt, StreamExt},
swarm::SwarmEvent,
StreamProtocol, Swarm, SwarmBuilder, Transport,
multiaddr::Protocol,
noise, relay,
swarm::{NetworkBehaviour, SwarmEvent},
yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder,
};
use libp2p_stream::Behaviour;
use serde::{Deserialize, Serialize};
use tokio::{
net::TcpListener,
sync::{mpsc, oneshot},
time::timeout,
};
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{debug, warn};
use tracing::{debug, error, warn};
use uuid::Uuid;
use crate::{
identity::REMOTE_IDENTITY_LEN,
quic::utils::{
identity_to_libp2p_keypair, remote_identity_to_libp2p_peerid, socketaddr_to_quic_multiaddr,
},
ConnectionRequest, HookEvent, ListenerId, RemoteIdentity, UnicastStream, P2P,
ConnectionRequest, HookEvent, ListenerId, PeerConnectionCandidate, RemoteIdentity,
UnicastStream, P2P,
};
const PROTOCOL: StreamProtocol = StreamProtocol::new("/sdp2p/1");
@ -49,6 +53,28 @@ enum InternalEvent {
ipv4: bool,
result: oneshot::Sender<Result<(), String>>,
},
RegisterRelays {
relays: Vec<RelayServerEntry>,
result: oneshot::Sender<Result<(), String>>,
},
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RelayServerEntry {
id: Uuid,
peer_id: String,
addrs: Vec<SocketAddr>,
}
#[derive(NetworkBehaviour)]
struct MyBehaviour {
stream: libp2p_stream::Behaviour,
// TODO: Can this be optional?
relay: relay::client::Behaviour,
// TODO: Can this be optional?
autonat: autonat::Behaviour,
// TODO: Can this be optional?
dcutr: dcutr::Behaviour,
}
/// Transport using Quic to establish a connection between peers.
@ -76,18 +102,20 @@ impl QuicTransport {
peer.listener_available(listener_id, connect_tx.clone());
});
let swarm = ok(ok(SwarmBuilder::with_existing_identity(keypair)
let swarm = SwarmBuilder::with_existing_identity(keypair)
.with_tokio()
.with_other_transport(|keypair| {
libp2p_quic::GenTransport::<libp2p_quic::tokio::Provider>::new(
libp2p_quic::Config::new(keypair),
)
.map(|(p, c), _| (p, StreamMuxerBox::new(c)))
.boxed()
}))
.with_behaviour(|_| Behaviour::new()))
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();
.with_quic()
.with_relay_client(noise::Config::new, yamux::Config::default)
.map_err(|err| err.to_string())?
.with_behaviour(|keypair, relay_behaviour| MyBehaviour {
stream: libp2p_stream::Behaviour::new(),
relay: relay_behaviour,
autonat: autonat::Behaviour::new(keypair.public().to_peer_id(), Default::default()),
dcutr: dcutr::Behaviour::new(keypair.public().to_peer_id()),
})
.map_err(|err| err.to_string())?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();
tokio::spawn(start(p2p.clone(), id, swarm, rx, internal_rx, connect_rx));
@ -101,6 +129,21 @@ impl QuicTransport {
))
}
/// Configure the relay servers to use.
/// This method will replace any existing relay servers.
pub async fn relay_config(&self, relays: Vec<RelayServerEntry>) {
let (tx, rx) = oneshot::channel();
let event = InternalEvent::RegisterRelays { relays, result: tx };
let Ok(_) = self.internal_tx.send(event) else {
return;
};
match rx.await.unwrap_or_else(|_| Ok(())) {
Ok(_) => {}
Err(e) => error!("Failed to register relay config as the event loop has died: {e}"),
}
}
// `None` on the port means disabled. Use `0` for random port.
pub async fn set_ipv4_enabled(&self, port: Option<u16>) -> Result<(), String> {
self.setup_listener(
@ -161,17 +204,10 @@ impl QuicTransport {
}
}
fn ok<T>(v: Result<T, Infallible>) -> T {
match v {
Ok(v) => v,
Err(_) => unreachable!(),
}
}
async fn start(
p2p: Arc<P2P>,
id: ListenerId,
mut swarm: Swarm<Behaviour>,
mut swarm: Swarm<MyBehaviour>,
rx: Receiver<HookEvent>,
internal_rx: Receiver<InternalEvent>,
mut connect_rx: mpsc::Receiver<ConnectionRequest>,
@ -179,10 +215,11 @@ async fn start(
let mut ipv4_listener = None;
let mut ipv6_listener = None;
let mut control = swarm.behaviour().new_control();
let mut control = swarm.behaviour().stream.new_control();
#[allow(clippy::unwrap_used)] // TODO: Error handling
let mut incoming = control.accept(PROTOCOL).unwrap();
let map = Arc::new(RwLock::new(HashMap::new()));
let mut relay_config = Vec::new();
loop {
tokio::select! {
@ -192,27 +229,20 @@ async fn start(
continue;
};
let peer_id = remote_identity_to_libp2p_peerid(&identity);
let addrs = {
let state = peer.state.read().unwrap_or_else(PoisonError::into_inner);
state
.discovered
.values()
.flatten()
.cloned()
.collect::<HashSet<_>>()
get_addrs(peer_id, &relay_config, state.discovered.values().flatten())
};
let peer_id = remote_identity_to_libp2p_peerid(&identity);
let mut control = control.clone();
tokio::spawn(async move {
match timeout(Duration::from_secs(5), control.open_stream_with_addrs(
peer_id,
PROTOCOL,
addrs.iter()
.map(socketaddr_to_quic_multiaddr)
.collect()
addrs
)).await {
Ok(Ok(_)) => {}
Err(_) | Ok(Err(_)) => peer.disconnected_from(id),
@ -332,19 +362,66 @@ async fn start(
}
let _ = result.send(Ok(()));
},
InternalEvent::RegisterRelays { relays, result } => {
// TODO: Replace any existing relays
// TODO: Only add some of the relays???
for relay in &relays {
let peer_id = match PeerId::from_str(&relay.peer_id) {
Ok(peer_id) => peer_id,
Err(err) => {
error!("Failed to parse Relay peer ID '{}': {err:?}", relay.peer_id);
continue;
},
};
let addrs = relay
.addrs
.iter()
.map(socketaddr_to_quic_multiaddr)
.collect::<Vec<_>>();
for addr in addrs {
swarm
.behaviour_mut()
.autonat
.add_server(peer_id, Some(addr.clone()));
swarm.add_peer_address(peer_id, addr);
}
// TODO: Only do this if autonat fails
match swarm.listen_on(
Multiaddr::empty()
.with(Protocol::Memory(40))
.with(Protocol::P2p(peer_id))
.with(Protocol::P2pCircuit)
) {
Ok(_) => {},
Err(e) => {
error!("Failed to listen on relay server '{}': {e}", relay.id);
// TODO: Try again if this fails
},
}
}
relay_config = relays;
// TODO: Proper error handling
result.send(Ok(())).ok();
},
},
Some(req) = connect_rx.recv() => {
let mut control = control.clone();
let self_remote_identity = p2p.identity().to_remote_identity();
let map = map.clone();
let peer_id = remote_identity_to_libp2p_peerid(&req.to);
let addrs = get_addrs(peer_id, &relay_config, req.addrs.iter());
tokio::spawn(async move {
let peer_id = remote_identity_to_libp2p_peerid(&req.to);
match control.open_stream_with_addrs(
peer_id,
PROTOCOL,
req.addrs.iter()
.map(socketaddr_to_quic_multiaddr)
.collect()
addrs,
).await {
Ok(mut stream) => {
map.write().unwrap_or_else(PoisonError::into_inner).insert(peer_id, req.to);
@ -368,3 +445,34 @@ async fn start(
}
}
}
fn get_addrs<'a>(
peer_id: PeerId,
relay_config: &Vec<RelayServerEntry>,
addrs: impl Iterator<Item = &'a PeerConnectionCandidate> + 'a,
) -> Vec<Multiaddr> {
addrs
.map(|v| match v {
PeerConnectionCandidate::SocketAddr(addr) => vec![socketaddr_to_quic_multiaddr(addr)],
PeerConnectionCandidate::Relay => relay_config
.iter()
.filter_map(|e| match PeerId::from_str(&e.peer_id) {
Ok(peer_id) => Some(e.addrs.iter().map(move |addr| (peer_id, addr))),
Err(err) => {
error!("Failed to parse peer ID '{}': {err:?}", e.peer_id);
None
}
})
.flatten()
.map(|(relay_peer_id, addr)| {
let mut addr = socketaddr_to_quic_multiaddr(addr);
addr.push(Protocol::P2p(relay_peer_id));
addr.push(Protocol::P2pCircuit);
addr.push(Protocol::P2p(peer_id));
addr
})
.collect::<Vec<_>>(),
})
.flatten()
.collect::<Vec<_>>()
}

View file

@ -29,14 +29,10 @@ export default function DebugSection() {
<Icon component={Factory} />
Actors
</SidebarLink>
<SidebarLink to="debug/p2p">
<SidebarLink to="debug/p2p/overview">
<Icon component={ShareNetwork} />
P2P
</SidebarLink>
<SidebarLink to="debug/p2p-rspc">
<Icon component={ShareNetwork} />
P2P (rspc)
</SidebarLink>
</div>
</Section>
);

View file

@ -5,6 +5,18 @@ export const debugRoutes = [
{ path: 'cloud', lazy: () => import('./cloud') },
{ path: 'sync', lazy: () => import('./sync') },
{ path: 'actors', lazy: () => import('./actors') },
{ path: 'p2p', lazy: () => import('./p2p') },
{ path: 'p2p-rspc', lazy: () => import('./p2p-rspc') }
{
path: 'p2p',
lazy: () => import('./p2p'),
children: [
{
path: 'overview',
lazy: () => import('./p2p').then((m) => ({ Component: m.Overview }))
},
{
path: 'remote',
lazy: () => import('./p2p').then((m) => ({ Component: m.RemotePeers }))
}
]
}
] satisfies RouteObject[];

View file

@ -1,37 +0,0 @@
import { useNavigate } from 'react-router';
import { useCache, useDiscoveredPeers, useLibraryQuery, useNodes } from '@sd/client';
import { Button } from '@sd/ui';
export const Component = () => {
// TODO: Handle if P2P is disabled
return (
<div className="p-4">
<PeerSelector />
</div>
);
};
function PeerSelector() {
const peers = useDiscoveredPeers();
const navigate = useNavigate();
return (
<>
<h1>Nodes:</h1>
{peers.size === 0 ? (
<p>No peers found...</p>
) : (
<ul>
{[...peers.entries()].map(([id, _node]) => (
<li key={id}>
{id}
<Button onClick={() => navigate(`/remote/${id}/browse`)}>
Open Library Browser
</Button>
</li>
))}
</ul>
)}
</>
);
}

View file

@ -1,3 +1,5 @@
import { useState } from 'react';
import { Outlet, useNavigate } from 'react-router';
import {
useBridgeMutation,
useBridgeQuery,
@ -7,23 +9,36 @@ import {
useNodes
} from '@sd/client';
import { Button, toast } from '@sd/ui';
import { useZodRouteParams, useZodSearchParams } from '~/hooks';
export const Component = () => {
const node = useBridgeQuery(['nodeState']);
const navigate = useNavigate();
// TODO: Handle if P2P is disabled
// const node = useBridgeQuery(['nodeState']);
// {node.data?.p2p_enabled === false ? (
// <h1 className="text-red-500">P2P is disabled. Please enable it in settings!</h1>
// ) : (
// <Page />
// )}
return (
<div className="p-4">
{/* {node.data?.p2p_enabled === false ? (
<h1 className="text-red-500">P2P is disabled. Please enable it in settings!</h1>
) : (
<Page />
)} */}
<Page />
<div>
<div className="flex space-x-4">
<Button variant="accent" onClick={() => navigate('overview')}>
Overview
</Button>
<Button variant="accent" onClick={() => navigate('remote')}>
Remote Peers
</Button>
</div>
<div className="p-4">
<Outlet />
</div>
</div>
);
};
function Page() {
export function Overview() {
const p2pState = useBridgeQuery(['p2p.state'], {
refetchInterval: 1000
});
@ -93,3 +108,28 @@ function Page() {
</div>
);
}
export function RemotePeers() {
const peers = useDiscoveredPeers();
const navigate = useNavigate();
return (
<>
<h1>Nodes:</h1>
{peers.size === 0 ? (
<p>No peers found...</p>
) : (
<ul>
{[...peers.entries()].map(([id, _node]) => (
<li key={id}>
{id}
<Button onClick={() => navigate(`/remote/${id}/browse`)}>
Open Library Browser
</Button>
</li>
))}
</ul>
)}
</>
);
}