rspc over P2P (#2112)

* wip: rspc over p2p

* wip

* rspc over P2P

* Cleanup + error handling

* slight cleanup

* Using Hyper for HTTP streaming + websockets
This commit is contained in:
Oscar Beaumont 2024-02-26 15:23:48 +08:00 committed by GitHub
parent f7a7b00e37
commit aa0b4abf85
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 258 additions and 18 deletions

2
Cargo.lock generated
View file

@ -7692,6 +7692,7 @@ dependencies = [
"hostname",
"http-body",
"http-range",
"hyper",
"image",
"int-enum",
"itertools 0.12.0",
@ -7744,6 +7745,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"tower-service",
"tracing",
"tracing-appender",
"tracing-subscriber",

View file

@ -13,6 +13,7 @@ use axum::{
response::Response,
RequestPartsExt,
};
use http::Method;
use hyper::server::{accept::Accept, conn::AddrIncoming};
use rand::{distributions::Alphanumeric, Rng};
use sd_core::{custom_uri, Node, NodeError};
@ -116,7 +117,7 @@ pub async fn sd_server_plugin<R: Runtime>(
#[derive(Deserialize)]
struct QueryParams {
token: String,
token: Option<String>,
}
async fn auth_middleware<B>(
@ -128,16 +129,19 @@ async fn auth_middleware<B>(
where
B: Send,
{
let req = if query.token != auth_token {
let req = if query.token.as_ref() != Some(&auth_token) {
let (mut parts, body) = request.into_parts();
let auth: TypedHeader<Authorization<Bearer>> = parts
.extract()
.await
.map_err(|_| StatusCode::UNAUTHORIZED)?;
// We don't check auth for OPTIONS requests cause the CORS middleware will handle it
if parts.method != Method::OPTIONS {
let auth: TypedHeader<Authorization<Bearer>> = parts
.extract()
.await
.map_err(|_| StatusCode::UNAUTHORIZED)?;
if auth.token() != auth_token {
return Err(StatusCode::UNAUTHORIZED);
if auth.token() != auth_token {
return Err(StatusCode::UNAUTHORIZED);
}
}
Request::from_parts(parts, body)

View file

@ -53,6 +53,12 @@ export const platform = {
constructServerUrl(`/file/${libraryId}/${locationLocalId}/${filePathId}`),
getFileUrlByPath: (path) =>
constructServerUrl(`/local-file-by-path/${encodeURIComponent(path)}`),
getRemoteRspcEndpoint: (remote_identity) => ({
url: `${customUriServerUrl?.[0]}/remote/${encodeURIComponent(remote_identity)}/rspc`,
headers: {
authorization: `Bearer ${customUriAuthToken}`
}
}),
openLink: shell.open,
getOs,
openDirectoryPickerDialog: (opts) => {

View file

@ -49,6 +49,9 @@ const platform: Platform = {
locationLocalId
)}/${encodeURIComponent(filePathId)}`,
getFileUrlByPath: (path) => `${spacedriveURL}/local-file-by-path/${encodeURIComponent(path)}`,
getRemoteRspcEndpoint: (remote_identity) => ({
url: `${spacedriveURL}/remote/${encodeURIComponent(remote_identity)}/rspc`
}),
openLink: (url) => window.open(url, '_blank')?.focus(),
confirm: (message, cb) => cb(window.confirm(message)),
auth: {

View file

@ -66,6 +66,7 @@ regex = { workspace = true }
reqwest = { workspace = true, features = ["json", "native-tls-vendored"] }
rmp-serde = { workspace = true }
rspc = { workspace = true, features = [
"axum",
"uuid",
"chrono",
"tracing",
@ -127,6 +128,8 @@ aws-config = "1.0.3"
aws-credential-types = "1.0.3"
base91 = "0.1.0"
sd-actors = { version = "0.1.0", path = "../crates/actors" }
tower-service = "0.3.2"
hyper = { version = "=0.14.28", features = ["http1", "server", "client"] }
# Override features of transitive dependencies
[dependencies.openssl]

View file

@ -238,7 +238,9 @@ pub(crate) fn mount() -> Arc<Router> {
<sd_prisma::prisma::object::Data as specta::NamedType>::SID,
def,
);
})
});
let r = r
.build(
#[allow(clippy::let_and_return)]
{

View file

@ -28,8 +28,9 @@ use async_stream::stream;
use axum::{
body::{self, Body, BoxBody, Full, StreamBody},
extract::{self, State},
http::{HeaderValue, Request, Response, StatusCode},
http::{HeaderMap, HeaderValue, Request, Response, StatusCode},
middleware,
response::IntoResponse,
routing::get,
Router,
};
@ -320,6 +321,36 @@ pub fn router(node: Arc<Node>) -> Router<()> {
},
),
)
.route(
"/remote/:identity/rspc/*path",
get(
|State(state): State<LocalState>,
extract::Path((identity, rest)): extract::Path<(String, String)>,
mut request: Request<Body>| async move {
let identity = match RemoteIdentity::from_str(&identity) {
Ok(identity) => identity,
Err(err) => {
error!("Error parsing identity '{}': {}", identity, err);
return (StatusCode::BAD_REQUEST, HeaderMap::new(), vec![])
.into_response();
}
};
*request.uri_mut() = format!("/{rest}")
.parse()
.expect("url was validated by Axum");
match operations::remote_rspc(state.node.p2p.p2p.clone(), identity, request)
.await
{
Ok(response) => response.into_response(),
Err(err) => {
error!("Error doing remote rspc query with '{identity}': {err:?}");
(StatusCode::INTERNAL_SERVER_ERROR, HeaderMap::new()).into_response()
}
}
},
),
)
.route_layer(middleware::from_fn(cors_middleware))
.with_state({
let file_metadata_cache = Arc::new(Cache::new(150));

View file

@ -158,13 +158,23 @@ impl Node {
init_data.apply(&node.libraries, &node).await?;
}
let router = api::mount();
// Be REALLY careful about ordering here or you'll get unreliable deadlock's!
locations_actor.start(node.clone());
node.libraries.init(&node).await?;
jobs_actor.start(node.clone());
start_p2p(node.clone());
let router = api::mount();
start_p2p(
node.clone(),
router
.clone()
.endpoint({
let node = node.clone();
move |_| node.clone()
})
.axum::<()>()
.into_make_service(),
);
info!("Spacedrive online.");
Ok((node, router))

View file

@ -7,6 +7,8 @@ use crate::{
Node,
};
use axum::routing::IntoMakeService;
use sd_p2p2::{
flume::{bounded, Receiver},
Libp2pPeerId, Listener, Mdns, Peer, QuicTransport, RemoteIdentity, UnicastStream, P2P,
@ -17,12 +19,15 @@ use serde_json::json;
use specta::Type;
use std::{
collections::{HashMap, HashSet},
convert::Infallible,
net::SocketAddr,
sync::{atomic::AtomicBool, Arc, Mutex, PoisonError},
};
use tower_service::Service;
use tracing::error;
use tokio::sync::oneshot;
use tracing::{error, info};
use tracing::info;
use uuid::Uuid;
use super::{P2PEvents, PeerMetadata};
@ -44,7 +49,13 @@ impl P2PManager {
pub async fn new(
node_config: Arc<config::Manager>,
libraries: Arc<crate::library::Libraries>,
) -> Result<(Arc<P2PManager>, impl FnOnce(Arc<Node>)), String> {
) -> Result<
(
Arc<P2PManager>,
impl FnOnce(Arc<Node>, IntoMakeService<axum::Router<()>>),
),
String,
> {
let (tx, rx) = bounded(25);
let p2p = P2P::new(SPACEDRIVE_APP_ID, node_config.get().await.identity, tx);
let (quic, lp2p_peer_id) = QuicTransport::spawn(p2p.clone())?;
@ -70,8 +81,8 @@ impl P2PManager {
this.p2p.listeners()
);
Ok((this.clone(), |node| {
tokio::spawn(start(this, node, rx));
Ok((this.clone(), |node, router| {
tokio::spawn(start(this, node, rx, router));
}))
}
@ -220,10 +231,13 @@ async fn start(
this: Arc<P2PManager>,
node: Arc<Node>,
rx: Receiver<UnicastStream>,
mut service: IntoMakeService<axum::Router<()>>,
) -> Result<(), ()> {
while let Ok(mut stream) = rx.recv_async().await {
let this = this.clone();
let node = node.clone();
let mut service = unwrap_infallible(service.call(()).await);
tokio::spawn(async move {
println!("APPLICATION GOT STREAM: {:?}", stream); // TODO
@ -286,6 +300,14 @@ async fn start(
error!("Failed to handle file request");
}
Header::Http => {
let remote = stream.remote_identity();
let Err(err) = operations::rspc::receiver(stream, &mut service).await else {
return;
};
error!("Failed to handling rspc request with '{remote}': {err:?}");
}
};
});
}
@ -309,3 +331,10 @@ pub fn into_listener2(l: &[Listener]) -> Vec<Listener2> {
})
.collect()
}
fn unwrap_infallible<T>(result: Result<T, Infallible>) -> T {
match result {
Ok(value) => value,
Err(err) => match err {},
}
}

View file

@ -1,6 +1,8 @@
pub mod ping;
pub mod request_file;
pub mod rspc;
pub mod spacedrop;
pub use request_file::request_file;
pub use rspc::remote_rspc;
pub use spacedrop::spacedrop;

View file

@ -0,0 +1,53 @@
use std::{error::Error, sync::Arc};
use axum::{body::Body, http, Router};
use hyper::{server::conn::Http, Response};
use sd_p2p2::{RemoteIdentity, UnicastStream, P2P};
use tokio::io::AsyncWriteExt;
use tracing::debug;
use crate::p2p::Header;
/// Transfer an rspc query to a remote node.
#[allow(unused)]
pub async fn remote_rspc(
p2p: Arc<P2P>,
identity: RemoteIdentity,
request: http::Request<axum::body::Body>,
) -> Result<Response<Body>, Box<dyn Error>> {
let peer = p2p
.peers()
.get(&identity)
.ok_or("Peer not found, has it been discovered?")?
.clone();
let mut stream = peer.new_stream().await?;
stream.write_all(&Header::Http.to_bytes()).await?;
let (mut sender, conn) = hyper::client::conn::handshake(stream).await?;
tokio::task::spawn(async move {
if let Err(err) = conn.await {
println!("Connection error: {:?}", err);
}
});
sender.send_request(request).await.map_err(Into::into)
}
pub(crate) async fn receiver(
stream: UnicastStream,
service: &mut Router,
) -> Result<(), Box<dyn Error>> {
debug!(
"Received http request from peer '{}'",
stream.remote_identity(),
);
Http::new()
.http1_only(true)
.http1_keep_alive(true)
.serve_connection(stream, service)
.with_upgrades()
.await
.map_err(Into::into)
}

View file

@ -21,6 +21,8 @@ pub enum Header {
Spacedrop(SpaceblockRequests),
Sync(Uuid),
File(HeaderFile),
// A HTTP server used for rspc requests and streaming files
Http,
}
#[derive(Debug, Error)]
@ -86,6 +88,7 @@ impl Header {
i => return Err(HeaderError::HeaderFileDiscriminatorInvalid(i)),
},
})),
5 => Ok(Self::Http),
d => Err(HeaderError::DiscriminatorInvalid(d)),
}
}
@ -116,6 +119,7 @@ impl Header {
buf.extend_from_slice(&range.to_bytes());
buf
}
Self::Http => vec![5],
}
}
}

View file

@ -33,6 +33,10 @@ export default function DebugSection() {
<Icon component={ShareNetwork} />
P2P
</SidebarLink>
<SidebarLink to="debug/p2p-rspc">
<Icon component={ShareNetwork} />
P2P (rspc)
</SidebarLink>
</div>
</Section>
);

View file

@ -5,5 +5,6 @@ export const debugRoutes = [
{ path: 'cloud', lazy: () => import('./cloud') },
{ path: 'sync', lazy: () => import('./sync') },
{ path: 'actors', lazy: () => import('./actors') },
{ path: 'p2p', lazy: () => import('./p2p') }
{ path: 'p2p', lazy: () => import('./p2p') },
{ path: 'p2p-rspc', lazy: () => import('./p2p-rspc') }
] satisfies RouteObject[];

View file

@ -0,0 +1,82 @@
import { httpLink, initRspc, type AlphaClient } from '@oscartbeaumont-sd/rspc-client/v2';
import { useEffect, useRef, useState } from 'react';
import { useDiscoveredPeers, type Procedures } from '@sd/client';
import { Button } from '@sd/ui';
import { usePlatform } from '~/util/Platform';
export const Component = () => {
// TODO: Handle if P2P is disabled
const [activePeer, setActivePeer] = useState<string | null>(null);
return (
<div className="p-4">
{activePeer ? (
<P2PInfo peer={activePeer} />
) : (
<PeerSelector setActivePeer={setActivePeer} />
)}
</div>
);
};
function PeerSelector({ setActivePeer }: { setActivePeer: (peer: string) => void }) {
const peers = useDiscoveredPeers();
return (
<>
<h1>Nodes:</h1>
{peers.size === 0 ? (
<p>No peers found...</p>
) : (
<ul>
{[...peers.entries()].map(([id, _node]) => (
<li key={id}>
{id}
<Button onClick={() => setActivePeer(id)}>Connect</Button>
</li>
))}
</ul>
)}
</>
);
}
function P2PInfo({ peer }: { peer: string }) {
const platform = usePlatform();
const ref = useRef<AlphaClient<Procedures>>();
const [result, setResult] = useState('');
useEffect(() => {
// TODO: Cleanup when URL changed
const endpoint = platform.getRemoteRspcEndpoint(peer);
ref.current = initRspc<Procedures>({
links: [
httpLink({
url: endpoint.url,
headers: endpoint.headers
})
]
});
}, [peer]);
useEffect(() => {
if (!ref.current) return;
ref.current.query(['nodeState']).then((data) => setResult(JSON.stringify(data, null, 2)));
}, [ref, result]);
return (
<div className="flex flex-col">
<h1>Connected with: {peer}</h1>
<Button
onClick={() => {
ref.current
?.query(['nodeState'])
.then((data) => setResult(JSON.stringify(data, null, 2)));
}}
>
Refetch
</Button>
<pre>{result}</pre>
</div>
);
}

View file

@ -17,6 +17,10 @@ export type Platform = {
getThumbnailUrlByThumbKey: (thumbKey: string[]) => string;
getFileUrl: (libraryId: string, locationLocalId: number, filePathId: number) => string;
getFileUrlByPath: (path: string) => string;
getRemoteRspcEndpoint: (remote_identity: string) => {
url: string;
headers?: Record<string, string>;
};
openLink: (url: string) => void;
// Tauri patches `window.confirm` to return `Promise` not `bool`
confirm(msg: string, cb: (result: boolean) => void): void;