diff --git a/Cargo.lock b/Cargo.lock index 72bbfc55d..f70073d49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7692,6 +7692,7 @@ dependencies = [ "hostname", "http-body", "http-range", + "hyper", "image", "int-enum", "itertools 0.12.0", @@ -7744,6 +7745,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", + "tower-service", "tracing", "tracing-appender", "tracing-subscriber", diff --git a/apps/desktop/src-tauri/src/tauri_plugins.rs b/apps/desktop/src-tauri/src/tauri_plugins.rs index 9d483fe81..6ec861ce7 100644 --- a/apps/desktop/src-tauri/src/tauri_plugins.rs +++ b/apps/desktop/src-tauri/src/tauri_plugins.rs @@ -13,6 +13,7 @@ use axum::{ response::Response, RequestPartsExt, }; +use http::Method; use hyper::server::{accept::Accept, conn::AddrIncoming}; use rand::{distributions::Alphanumeric, Rng}; use sd_core::{custom_uri, Node, NodeError}; @@ -116,7 +117,7 @@ pub async fn sd_server_plugin( #[derive(Deserialize)] struct QueryParams { - token: String, + token: Option, } async fn auth_middleware( @@ -128,16 +129,19 @@ async fn auth_middleware( where B: Send, { - let req = if query.token != auth_token { + let req = if query.token.as_ref() != Some(&auth_token) { let (mut parts, body) = request.into_parts(); - let auth: TypedHeader> = parts - .extract() - .await - .map_err(|_| StatusCode::UNAUTHORIZED)?; + // We don't check auth for OPTIONS requests cause the CORS middleware will handle it + if parts.method != Method::OPTIONS { + let auth: TypedHeader> = parts + .extract() + .await + .map_err(|_| StatusCode::UNAUTHORIZED)?; - if auth.token() != auth_token { - return Err(StatusCode::UNAUTHORIZED); + if auth.token() != auth_token { + return Err(StatusCode::UNAUTHORIZED); + } } Request::from_parts(parts, body) diff --git a/apps/desktop/src/platform.ts b/apps/desktop/src/platform.ts index e9b9bf440..67c2bb2cc 100644 --- a/apps/desktop/src/platform.ts +++ b/apps/desktop/src/platform.ts @@ -53,6 +53,12 @@ export const platform = { constructServerUrl(`/file/${libraryId}/${locationLocalId}/${filePathId}`), getFileUrlByPath: (path) => constructServerUrl(`/local-file-by-path/${encodeURIComponent(path)}`), + getRemoteRspcEndpoint: (remote_identity) => ({ + url: `${customUriServerUrl?.[0]}/remote/${encodeURIComponent(remote_identity)}/rspc`, + headers: { + authorization: `Bearer ${customUriAuthToken}` + } + }), openLink: shell.open, getOs, openDirectoryPickerDialog: (opts) => { diff --git a/apps/web/src/App.tsx b/apps/web/src/App.tsx index 14ceddc46..e4492e238 100644 --- a/apps/web/src/App.tsx +++ b/apps/web/src/App.tsx @@ -49,6 +49,9 @@ const platform: Platform = { locationLocalId )}/${encodeURIComponent(filePathId)}`, getFileUrlByPath: (path) => `${spacedriveURL}/local-file-by-path/${encodeURIComponent(path)}`, + getRemoteRspcEndpoint: (remote_identity) => ({ + url: `${spacedriveURL}/remote/${encodeURIComponent(remote_identity)}/rspc` + }), openLink: (url) => window.open(url, '_blank')?.focus(), confirm: (message, cb) => cb(window.confirm(message)), auth: { diff --git a/core/Cargo.toml b/core/Cargo.toml index b1ed15311..79f32bd59 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -66,6 +66,7 @@ regex = { workspace = true } reqwest = { workspace = true, features = ["json", "native-tls-vendored"] } rmp-serde = { workspace = true } rspc = { workspace = true, features = [ + "axum", "uuid", "chrono", "tracing", @@ -127,6 +128,8 @@ aws-config = "1.0.3" aws-credential-types = "1.0.3" base91 = "0.1.0" sd-actors = { version = "0.1.0", path = "../crates/actors" } +tower-service = "0.3.2" +hyper = { version = "=0.14.28", features = ["http1", "server", "client"] } # Override features of transitive dependencies [dependencies.openssl] diff --git a/core/src/api/mod.rs b/core/src/api/mod.rs index 903a22641..5d24fb611 100644 --- a/core/src/api/mod.rs +++ b/core/src/api/mod.rs @@ -238,7 +238,9 @@ pub(crate) fn mount() -> Arc { ::SID, def, ); - }) + }); + + let r = r .build( #[allow(clippy::let_and_return)] { diff --git a/core/src/custom_uri/mod.rs b/core/src/custom_uri/mod.rs index 19ef0607b..29811893d 100644 --- a/core/src/custom_uri/mod.rs +++ b/core/src/custom_uri/mod.rs @@ -28,8 +28,9 @@ use async_stream::stream; use axum::{ body::{self, Body, BoxBody, Full, StreamBody}, extract::{self, State}, - http::{HeaderValue, Request, Response, StatusCode}, + http::{HeaderMap, HeaderValue, Request, Response, StatusCode}, middleware, + response::IntoResponse, routing::get, Router, }; @@ -320,6 +321,36 @@ pub fn router(node: Arc) -> Router<()> { }, ), ) + .route( + "/remote/:identity/rspc/*path", + get( + |State(state): State, + extract::Path((identity, rest)): extract::Path<(String, String)>, + mut request: Request| async move { + let identity = match RemoteIdentity::from_str(&identity) { + Ok(identity) => identity, + Err(err) => { + error!("Error parsing identity '{}': {}", identity, err); + return (StatusCode::BAD_REQUEST, HeaderMap::new(), vec![]) + .into_response(); + } + }; + *request.uri_mut() = format!("/{rest}") + .parse() + .expect("url was validated by Axum"); + + match operations::remote_rspc(state.node.p2p.p2p.clone(), identity, request) + .await + { + Ok(response) => response.into_response(), + Err(err) => { + error!("Error doing remote rspc query with '{identity}': {err:?}"); + (StatusCode::INTERNAL_SERVER_ERROR, HeaderMap::new()).into_response() + } + } + }, + ), + ) .route_layer(middleware::from_fn(cors_middleware)) .with_state({ let file_metadata_cache = Arc::new(Cache::new(150)); diff --git a/core/src/lib.rs b/core/src/lib.rs index 0b263ba39..d6d22e458 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -158,13 +158,23 @@ impl Node { init_data.apply(&node.libraries, &node).await?; } + let router = api::mount(); + // Be REALLY careful about ordering here or you'll get unreliable deadlock's! locations_actor.start(node.clone()); node.libraries.init(&node).await?; jobs_actor.start(node.clone()); - start_p2p(node.clone()); - - let router = api::mount(); + start_p2p( + node.clone(), + router + .clone() + .endpoint({ + let node = node.clone(); + move |_| node.clone() + }) + .axum::<()>() + .into_make_service(), + ); info!("Spacedrive online."); Ok((node, router)) diff --git a/core/src/p2p/manager.rs b/core/src/p2p/manager.rs index 0e4daeae0..9e67ac952 100644 --- a/core/src/p2p/manager.rs +++ b/core/src/p2p/manager.rs @@ -7,6 +7,8 @@ use crate::{ Node, }; +use axum::routing::IntoMakeService; + use sd_p2p2::{ flume::{bounded, Receiver}, Libp2pPeerId, Listener, Mdns, Peer, QuicTransport, RemoteIdentity, UnicastStream, P2P, @@ -17,12 +19,15 @@ use serde_json::json; use specta::Type; use std::{ collections::{HashMap, HashSet}, + convert::Infallible, net::SocketAddr, sync::{atomic::AtomicBool, Arc, Mutex, PoisonError}, }; +use tower_service::Service; +use tracing::error; use tokio::sync::oneshot; -use tracing::{error, info}; +use tracing::info; use uuid::Uuid; use super::{P2PEvents, PeerMetadata}; @@ -44,7 +49,13 @@ impl P2PManager { pub async fn new( node_config: Arc, libraries: Arc, - ) -> Result<(Arc, impl FnOnce(Arc)), String> { + ) -> Result< + ( + Arc, + impl FnOnce(Arc, IntoMakeService>), + ), + String, + > { let (tx, rx) = bounded(25); let p2p = P2P::new(SPACEDRIVE_APP_ID, node_config.get().await.identity, tx); let (quic, lp2p_peer_id) = QuicTransport::spawn(p2p.clone())?; @@ -70,8 +81,8 @@ impl P2PManager { this.p2p.listeners() ); - Ok((this.clone(), |node| { - tokio::spawn(start(this, node, rx)); + Ok((this.clone(), |node, router| { + tokio::spawn(start(this, node, rx, router)); })) } @@ -220,10 +231,13 @@ async fn start( this: Arc, node: Arc, rx: Receiver, + mut service: IntoMakeService>, ) -> Result<(), ()> { while let Ok(mut stream) = rx.recv_async().await { let this = this.clone(); let node = node.clone(); + let mut service = unwrap_infallible(service.call(()).await); + tokio::spawn(async move { println!("APPLICATION GOT STREAM: {:?}", stream); // TODO @@ -286,6 +300,14 @@ async fn start( error!("Failed to handle file request"); } + Header::Http => { + let remote = stream.remote_identity(); + let Err(err) = operations::rspc::receiver(stream, &mut service).await else { + return; + }; + + error!("Failed to handling rspc request with '{remote}': {err:?}"); + } }; }); } @@ -309,3 +331,10 @@ pub fn into_listener2(l: &[Listener]) -> Vec { }) .collect() } + +fn unwrap_infallible(result: Result) -> T { + match result { + Ok(value) => value, + Err(err) => match err {}, + } +} diff --git a/core/src/p2p/operations/mod.rs b/core/src/p2p/operations/mod.rs index 2083c64f8..adbaaa809 100644 --- a/core/src/p2p/operations/mod.rs +++ b/core/src/p2p/operations/mod.rs @@ -1,6 +1,8 @@ pub mod ping; pub mod request_file; +pub mod rspc; pub mod spacedrop; pub use request_file::request_file; +pub use rspc::remote_rspc; pub use spacedrop::spacedrop; diff --git a/core/src/p2p/operations/rspc.rs b/core/src/p2p/operations/rspc.rs new file mode 100644 index 000000000..f40db45d3 --- /dev/null +++ b/core/src/p2p/operations/rspc.rs @@ -0,0 +1,53 @@ +use std::{error::Error, sync::Arc}; + +use axum::{body::Body, http, Router}; +use hyper::{server::conn::Http, Response}; +use sd_p2p2::{RemoteIdentity, UnicastStream, P2P}; +use tokio::io::AsyncWriteExt; +use tracing::debug; + +use crate::p2p::Header; + +/// Transfer an rspc query to a remote node. +#[allow(unused)] +pub async fn remote_rspc( + p2p: Arc, + identity: RemoteIdentity, + request: http::Request, +) -> Result, Box> { + let peer = p2p + .peers() + .get(&identity) + .ok_or("Peer not found, has it been discovered?")? + .clone(); + let mut stream = peer.new_stream().await?; + + stream.write_all(&Header::Http.to_bytes()).await?; + + let (mut sender, conn) = hyper::client::conn::handshake(stream).await?; + tokio::task::spawn(async move { + if let Err(err) = conn.await { + println!("Connection error: {:?}", err); + } + }); + + sender.send_request(request).await.map_err(Into::into) +} + +pub(crate) async fn receiver( + stream: UnicastStream, + service: &mut Router, +) -> Result<(), Box> { + debug!( + "Received http request from peer '{}'", + stream.remote_identity(), + ); + + Http::new() + .http1_only(true) + .http1_keep_alive(true) + .serve_connection(stream, service) + .with_upgrades() + .await + .map_err(Into::into) +} diff --git a/core/src/p2p/protocol.rs b/core/src/p2p/protocol.rs index ff9a89c24..6948befeb 100644 --- a/core/src/p2p/protocol.rs +++ b/core/src/p2p/protocol.rs @@ -21,6 +21,8 @@ pub enum Header { Spacedrop(SpaceblockRequests), Sync(Uuid), File(HeaderFile), + // A HTTP server used for rspc requests and streaming files + Http, } #[derive(Debug, Error)] @@ -86,6 +88,7 @@ impl Header { i => return Err(HeaderError::HeaderFileDiscriminatorInvalid(i)), }, })), + 5 => Ok(Self::Http), d => Err(HeaderError::DiscriminatorInvalid(d)), } } @@ -116,6 +119,7 @@ impl Header { buf.extend_from_slice(&range.to_bytes()); buf } + Self::Http => vec![5], } } } diff --git a/interface/app/$libraryId/Layout/Sidebar/sections/Debug/index.tsx b/interface/app/$libraryId/Layout/Sidebar/sections/Debug/index.tsx index 80437b187..dcb9bde54 100644 --- a/interface/app/$libraryId/Layout/Sidebar/sections/Debug/index.tsx +++ b/interface/app/$libraryId/Layout/Sidebar/sections/Debug/index.tsx @@ -33,6 +33,10 @@ export default function DebugSection() { P2P + + + P2P (rspc) + ); diff --git a/interface/app/$libraryId/debug/index.ts b/interface/app/$libraryId/debug/index.ts index 37c7c39e5..3bedc83b0 100644 --- a/interface/app/$libraryId/debug/index.ts +++ b/interface/app/$libraryId/debug/index.ts @@ -5,5 +5,6 @@ export const debugRoutes = [ { path: 'cloud', lazy: () => import('./cloud') }, { path: 'sync', lazy: () => import('./sync') }, { path: 'actors', lazy: () => import('./actors') }, - { path: 'p2p', lazy: () => import('./p2p') } + { path: 'p2p', lazy: () => import('./p2p') }, + { path: 'p2p-rspc', lazy: () => import('./p2p-rspc') } ] satisfies RouteObject[]; diff --git a/interface/app/$libraryId/debug/p2p-rspc.tsx b/interface/app/$libraryId/debug/p2p-rspc.tsx new file mode 100644 index 000000000..36e7aa43c --- /dev/null +++ b/interface/app/$libraryId/debug/p2p-rspc.tsx @@ -0,0 +1,82 @@ +import { httpLink, initRspc, type AlphaClient } from '@oscartbeaumont-sd/rspc-client/v2'; +import { useEffect, useRef, useState } from 'react'; +import { useDiscoveredPeers, type Procedures } from '@sd/client'; +import { Button } from '@sd/ui'; +import { usePlatform } from '~/util/Platform'; + +export const Component = () => { + // TODO: Handle if P2P is disabled + const [activePeer, setActivePeer] = useState(null); + + return ( +
+ {activePeer ? ( + + ) : ( + + )} +
+ ); +}; + +function PeerSelector({ setActivePeer }: { setActivePeer: (peer: string) => void }) { + const peers = useDiscoveredPeers(); + + return ( + <> +

Nodes:

+ {peers.size === 0 ? ( +

No peers found...

+ ) : ( +
    + {[...peers.entries()].map(([id, _node]) => ( +
  • + {id} + +
  • + ))} +
+ )} + + ); +} + +function P2PInfo({ peer }: { peer: string }) { + const platform = usePlatform(); + const ref = useRef>(); + const [result, setResult] = useState(''); + useEffect(() => { + // TODO: Cleanup when URL changed + const endpoint = platform.getRemoteRspcEndpoint(peer); + ref.current = initRspc({ + links: [ + httpLink({ + url: endpoint.url, + headers: endpoint.headers + }) + ] + }); + }, [peer]); + + useEffect(() => { + if (!ref.current) return; + ref.current.query(['nodeState']).then((data) => setResult(JSON.stringify(data, null, 2))); + }, [ref, result]); + + return ( +
+

Connected with: {peer}

+ + +
{result}
+
+ ); +} diff --git a/interface/util/Platform.tsx b/interface/util/Platform.tsx index 103c55618..71f1b6227 100644 --- a/interface/util/Platform.tsx +++ b/interface/util/Platform.tsx @@ -17,6 +17,10 @@ export type Platform = { getThumbnailUrlByThumbKey: (thumbKey: string[]) => string; getFileUrl: (libraryId: string, locationLocalId: number, filePathId: number) => string; getFileUrlByPath: (path: string) => string; + getRemoteRspcEndpoint: (remote_identity: string) => { + url: string; + headers?: Record; + }; openLink: (url: string) => void; // Tauri patches `window.confirm` to return `Promise` not `bool` confirm(msg: string, cb: (result: boolean) => void): void;