Serve files over p2p (#2523)

* serve files over p2p

* include location instance id in sync

* Fix P2P addressing?

---------

Co-authored-by: Brendan Allan <brendonovich@outlook.com>
This commit is contained in:
Oscar Beaumont 2024-05-30 22:07:28 +08:00 committed by GitHub
parent b015763a6f
commit 0392c781d7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 384 additions and 60 deletions

2
Cargo.lock generated
View file

@ -9546,8 +9546,10 @@ name = "sd-p2p-tunnel"
version = "0.1.0"
dependencies = [
"sd-p2p",
"sd-p2p-proto",
"thiserror",
"tokio",
"uuid",
]
[[package]]

View file

@ -112,6 +112,7 @@ file_path::select!(file_path_to_handle_custom_uri {
instance: select {
identity
remote_identity
node_remote_identity
}
}
});

View file

@ -70,6 +70,12 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
.find_many(vec![location::id::gt(cursor)])
.order_by(location::id::order(SortOrder::Asc))
.take(1000)
.include(location::include!({
instance: select {
id
pub_id
}
}))
.exec()
},
|location| location.id,
@ -108,6 +114,14 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
),
option_sync_entry!(l.hidden, hidden),
option_sync_entry!(l.date_created, date_created),
option_sync_entry!(
l.instance.map(|i| {
prisma_sync::instance::SyncId {
pub_id: i.pub_id,
}
}),
instance
),
],
),
)

View file

@ -68,7 +68,7 @@ model Node {
// represents a single `.db` file (SQLite DB) that is paired to the current library.
// A `LibraryInstance` is always owned by a single `Node` but it's possible for that node to change (or two to be owned by a single node).
/// @local
/// @local(id: pub_id)
model Instance {
id Int @id @default(autoincrement()) // This is is NOT globally unique
pub_id Bytes @unique // This UUID is meaningless and exists soley cause the `uhlc::ID` must be 16-bit. Really this should be derived from the `identity` field.
@ -148,8 +148,7 @@ model Location {
scan_state Int @default(0) // Enum: sd_core::location::ScanState
/// @local
// this is just a client side cache which is annoying but oh well (@brendan)
// this should just be a local-only cache but it's too much effort to broadcast online locations rn (@brendan)
instance_id Int?
instance Instance? @relation(fields: [instance_id], references: [id], onDelete: SetNull)
@ -627,7 +626,7 @@ model IndexerRule {
id Int @id @default(autoincrement())
pub_id Bytes @unique
name String? @unique
name String? @unique
default Boolean?
rules_per_kind Bytes?
date_created DateTime?

View file

@ -2,18 +2,23 @@ use crate::{
api::{utils::InvalidateOperationEvent, CoreEvent},
library::Library,
object::media::old_thumbnail::WEBP_EXTENSION,
p2p::operations,
p2p::operations::{self, request_file},
util::InfallibleResponse,
Node,
};
use async_stream::stream;
use bytes::Bytes;
use mpsc_to_async_write::MpscToAsyncWrite;
use sd_core_file_path_helper::IsolatedFilePathData;
use sd_core_prisma_helpers::file_path_to_handle_custom_uri;
use sd_file_ext::text::is_text;
use sd_p2p::{RemoteIdentity, P2P};
use sd_p2p_block::Range;
use sd_prisma::prisma::{file_path, location};
use sd_utils::db::maybe_missing;
use tokio_util::sync::PollSender;
use std::{
cmp::min,
@ -26,7 +31,7 @@ use std::{
};
use axum::{
body::{self, Body, BoxBody, Full},
body::{self, Body, BoxBody, Full, StreamBody},
extract::{self, State},
http::{HeaderMap, HeaderValue, Request, Response, StatusCode},
middleware,
@ -68,7 +73,11 @@ pub enum ServeFrom {
/// Serve from the local filesystem
Local,
/// Serve from a specific instance
Remote(RemoteIdentity),
Remote {
library_identity: RemoteIdentity,
node_identity: RemoteIdentity,
library: Arc<Library>,
},
}
#[derive(Clone)]
@ -176,17 +185,29 @@ async fn get_or_init_lru_entry(
let path = Path::new(path)
.join(IsolatedFilePathData::try_from((location_id, &file_path)).map_err(not_found)?);
let identity =
let library_identity =
RemoteIdentity::from_bytes(&instance.remote_identity).map_err(internal_server_error)?;
let node_identity = RemoteIdentity::from_bytes(
instance
.node_remote_identity
.as_ref()
.expect("node_remote_identity is required"),
)
.map_err(internal_server_error)?;
let lru_entry = CacheValue {
name: path,
ext: maybe_missing(file_path.extension, "extension").map_err(not_found)?,
file_path_pub_id: Uuid::from_slice(&file_path.pub_id).map_err(internal_server_error)?,
serve_from: if identity == library.identity.to_remote_identity() {
serve_from: if library_identity == library.identity.to_remote_identity() {
ServeFrom::Local
} else {
ServeFrom::Remote(identity)
ServeFrom::Remote {
library_identity,
node_identity,
library: library.clone(),
}
},
};
@ -240,19 +261,16 @@ pub fn base_router() -> Router<LocalState> {
.route(
"/file/:lib_id/:loc_id/:path_id",
get(
|State(state): State<LocalState>,
path: ExtractedPath,
mut request: Request<Body>| async move {
let part_parts = path.0.clone();
|State(state): State<LocalState>, path: ExtractedPath, request: Request<Body>| async move {
let (
CacheValue {
name: file_path_full_path,
ext: extension,
file_path_pub_id: _file_path_pub_id,
file_path_pub_id,
serve_from,
..
},
_library,
library,
) = get_or_init_lru_entry(&state, path).await?;
match serve_from {
@ -288,18 +306,37 @@ pub fn base_router() -> Router<LocalState> {
serve_file(file, Ok(metadata), request.into_parts().0, resp).await
}
ServeFrom::Remote(identity) => {
*request.uri_mut() =
format!("/file/{}/{}/{}", part_parts.0, part_parts.1, part_parts.2)
.parse()
.expect("url was validated by Axum");
ServeFrom::Remote {
library_identity: _,
node_identity,
library,
} => {
// TODO: Support `Range` requests and `ETag` headers
Ok(request_to_remote_node(
let (tx, mut rx) = tokio::sync::mpsc::channel::<io::Result<Bytes>>(150);
request_file(
state.node.p2p.p2p.clone(),
identity,
request,
node_identity,
&library.id,
&library.identity,
file_path_pub_id,
Range::Full,
MpscToAsyncWrite::new(PollSender::new(tx)),
)
.await)
.await
.map_err(|err| {
error!("Error requesting file {file_path_pub_id:?} from node {:?}: {err:?}", library.identity.to_remote_identity());
internal_server_error(())
})?;
// TODO: Content Type
Ok(InfallibleResponse::builder().status(StatusCode::OK).body(
body::boxed(StreamBody::new(stream! {
while let Some(item) = rx.recv().await {
yield item;
}
})),
))
}
}
},

View file

@ -732,7 +732,7 @@ async fn sync_rx_actor(
InvalidateOperationEvent::all(),
)),
SyncMessage::Created => {
p2p::sync::originator(library.id, &library.sync, &node.p2p).await
p2p::sync::originator(library.clone(), &library.sync, &node.p2p).await
}
}
}

View file

@ -387,12 +387,13 @@ async fn start(
error!("Failed to handle Spacedrop request");
}
Header::Sync(library_id) => {
Header::Sync => {
let Ok(mut tunnel) = Tunnel::responder(stream).await.map_err(|err| {
error!("Failed `Tunnel::responder`: {}", err);
}) else {
return;
};
let library_id = tunnel.library_id();
let Ok(msg) = SyncMessage::from_stream(&mut tunnel).await.map_err(|err| {
error!("Failed `SyncMessage::from_stream`: {}", err);
@ -432,6 +433,19 @@ async fn start(
error!("Failed to handling rspc request with '{remote}': {err:?}");
}
Header::LibraryFile {
file_path_id,
range,
} => {
let remote = stream.remote_identity();
let Err(err) =
operations::library::receiver(stream, file_path_id, range, &node).await
else {
return;
};
error!("Failed to handling library file request with {remote:?} for {file_path_id}: {err:?}");
}
};
});
}

View file

@ -0,0 +1,140 @@
use std::{
error::Error,
path::Path,
sync::{atomic::AtomicBool, Arc},
};
use sd_core_file_path_helper::IsolatedFilePathData;
use sd_core_prisma_helpers::file_path_to_handle_p2p_serve_file;
use sd_p2p::{Identity, RemoteIdentity, UnicastStream, P2P};
use sd_p2p_block::{BlockSize, Range, SpaceblockRequest, SpaceblockRequests, Transfer};
use sd_prisma::prisma::file_path;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader},
};
use tracing::debug;
use uuid::Uuid;
use crate::{p2p::Header, Node};
/// Request a file from a remote library
#[allow(unused)]
pub async fn request_file(
p2p: Arc<P2P>,
identity: RemoteIdentity,
library_id: &Uuid,
library_identity: &Identity,
file_path_id: Uuid,
range: Range,
output: impl AsyncWrite + Unpin,
) -> Result<(), Box<dyn Error>> {
let peer = p2p.peers().get(&identity).ok_or("Peer offline")?.clone();
let mut stream = peer.new_stream().await?;
stream
.write_all(
&Header::LibraryFile {
file_path_id,
range: range.clone(),
}
.to_bytes(),
)
.await?;
let mut stream = sd_p2p_tunnel::Tunnel::initiator(stream, library_id, library_identity).await?;
let block_size = BlockSize::from_stream(&mut stream).await?;
let size = stream.read_u64_le().await?;
Transfer::new(
&SpaceblockRequests {
id: Uuid::new_v4(),
block_size,
requests: vec![SpaceblockRequest {
name: "_".to_string(),
size,
range,
}],
},
|percent| debug!("P2P receiving file path {file_path_id:?} - progress {percent}%"),
&Arc::new(AtomicBool::new(false)),
)
.receive(&mut stream, output)
.await;
Ok(())
}
pub(crate) async fn receiver(
stream: UnicastStream,
file_path_id: Uuid,
range: Range,
node: &Arc<Node>,
) -> Result<(), Box<dyn Error>> {
debug!(
"Received library request from peer '{}'",
stream.remote_identity()
);
// The tunnel takes care of authentication and encrypts all traffic to the library to be certain we are talking to a node with the library.
let mut stream = sd_p2p_tunnel::Tunnel::responder(stream).await?;
let library = node
.libraries
.get_library(&stream.library_id())
.await
.ok_or_else(|| format!("Library not found: {:?}", stream.library_id()))?;
let file_path = library
.db
.file_path()
.find_unique(file_path::pub_id::equals(file_path_id.as_bytes().to_vec()))
.select(file_path_to_handle_p2p_serve_file::select())
.exec()
.await?
.ok_or_else(|| {
format!(
"File path {file_path_id:?} not found in {:?}",
stream.library_id()
)
})?;
let location = file_path.location.as_ref().expect("included in query");
let location_path = location.path.as_ref().expect("included in query");
let path =
Path::new(location_path).join(IsolatedFilePathData::try_from((location.id, &file_path))?);
debug!(
"Serving path {path:?} for library {:?} over P2P",
library.id
);
let file = File::open(&path).await?;
let metadata = file.metadata().await?;
let block_size = BlockSize::from_file_size(metadata.len());
stream.write_all(&block_size.to_bytes()).await?;
stream.write_all(&metadata.len().to_le_bytes()).await?;
let file = BufReader::new(file);
Transfer::new(
&SpaceblockRequests {
id: Uuid::new_v4(),
block_size,
requests: vec![SpaceblockRequest {
name: "_".into(),
size: metadata.len(),
range,
}],
},
|percent| debug!("P2P loading file path {file_path_id:?} - progress {percent}%"),
// TODO: Properly handle cancellation with webview
&Arc::new(AtomicBool::new(false)),
)
.send(&mut stream, file)
.await?;
Ok(())
}

View file

@ -1,6 +1,8 @@
pub mod library;
pub mod ping;
pub mod rspc;
pub mod spacedrop;
pub use library::request_file;
pub use rspc::remote_rspc;
pub use spacedrop::spacedrop;

View file

@ -1,4 +1,4 @@
use sd_p2p_block::{SpaceblockRequests, SpaceblockRequestsError};
use sd_p2p_block::{Range, SpaceblockRequests, SpaceblockRequestsError};
use sd_p2p_proto::{decode, encode};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt};
@ -7,12 +7,20 @@ use uuid::Uuid;
/// TODO
#[derive(Debug, PartialEq, Eq)]
pub enum Header {
// TODO: Split out cause this is a broadcast
/// Basic pin protocol for demonstrating the P2P system
Ping,
/// Spacedrop file sending
Spacedrop(SpaceblockRequests),
Sync(Uuid),
/// Used for sending sync messages between nodes.
Sync,
// A HTTP server used for rspc requests and streaming files
RspcRemote,
// Request a file within a library
// We don't include a library ID here as it's taken care of by `sd_p2p_tunnel::Tunnel`.
LibraryFile {
file_path_id: Uuid,
range: Range,
},
}
#[derive(Debug, Error)]
@ -23,8 +31,12 @@ pub enum HeaderError {
DiscriminatorInvalid(u8),
#[error("error reading spacedrop request: {0}")]
SpacedropRequest(#[from] SpaceblockRequestsError),
#[error("error reading sync request: {0}")]
SyncRequest(decode::Error),
#[error("error with library file decode '{0}'")]
LibraryFileDecodeError(decode::Error),
#[error("error with library file io '{0}'")]
LibraryFileIoError(std::io::Error),
#[error("invalid range discriminator for library file req '{0}'")]
LibraryDiscriminatorInvalid(u8),
}
impl Header {
@ -39,12 +51,32 @@ impl Header {
SpaceblockRequests::from_stream(stream).await?,
)),
1 => Ok(Self::Ping),
3 => Ok(Self::Sync(
decode::uuid(stream)
.await
.map_err(HeaderError::SyncRequest)?,
)),
3 => Ok(Self::Sync),
5 => Ok(Self::RspcRemote),
6 => Ok(Self::LibraryFile {
file_path_id: decode::uuid(stream)
.await
.map_err(HeaderError::LibraryFileDecodeError)?,
range: match stream
.read_u8()
.await
.map_err(HeaderError::LibraryFileIoError)?
{
0 => Range::Full,
1 => {
let start = stream
.read_u64_le()
.await
.map_err(HeaderError::LibraryFileIoError)?;
let end = stream
.read_u64_le()
.await
.map_err(HeaderError::LibraryFileIoError)?;
Range::Partial(start..end)
}
d => return Err(HeaderError::LibraryDiscriminatorInvalid(d)),
},
}),
d => Err(HeaderError::DiscriminatorInvalid(d)),
}
}
@ -57,12 +89,17 @@ impl Header {
bytes
}
Self::Ping => vec![1],
Self::Sync(uuid) => {
let mut bytes = vec![3];
encode::uuid(&mut bytes, uuid);
bytes
}
Self::Sync => vec![3],
Self::RspcRemote => vec![5],
Self::LibraryFile {
file_path_id,
range,
} => {
let mut buf = vec![6];
encode::uuid(&mut buf, file_path_id);
buf.extend_from_slice(&range.to_bytes());
buf
}
}
}
}

View file

@ -12,7 +12,6 @@ use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tracing::*;
use uuid::Uuid;
use super::P2PManager;
@ -58,6 +57,7 @@ mod originator {
#[tokio::test]
async fn test() {
use sd_sync::CRDTOperation;
use uuid::Uuid;
{
let original = Operations(CompressedCRDTOperations::new(vec![]));
@ -84,28 +84,33 @@ mod originator {
}
/// REMEMBER: This only syncs one direction!
pub async fn run(library_id: Uuid, sync: &Arc<sync::Manager>, p2p: &Arc<super::P2PManager>) {
for (remote_identity, peer) in p2p.get_library_instances(&library_id) {
pub async fn run(
library: Arc<Library>,
sync: &Arc<sync::Manager>,
p2p: &Arc<super::P2PManager>,
) {
for (remote_identity, peer) in p2p.get_library_instances(&library.id) {
if !peer.is_connected() {
continue;
};
let sync = sync.clone();
let library = library.clone();
tokio::spawn(async move {
debug!(
"Alerting peer '{remote_identity:?}' of new sync events for library '{library_id:?}'"
"Alerting peer {remote_identity:?} of new sync events for library {:?}",
library.id
);
let mut stream = peer.new_stream().await.unwrap();
stream
.write_all(&Header::Sync(library_id).to_bytes())
stream.write_all(&Header::Sync.to_bytes()).await.unwrap();
let mut tunnel = Tunnel::initiator(stream, &library.id, &library.identity)
.await
.unwrap();
let mut tunnel = Tunnel::initiator(stream).await.unwrap();
tunnel
.write_all(&SyncMessage::NewOperations.to_bytes())
.await

View file

@ -9,6 +9,8 @@ repository.workspace = true
[dependencies]
# Spacedrive Sub-crates
sd-p2p = { path = "../../" }
sd-p2p-proto = { path = "../proto" }
tokio = { workspace = true, features = ["io-util"] }
thiserror = { workspace = true }
uuid = { workspace = true, features = ["v4"] }

View file

@ -4,11 +4,13 @@ use std::{
task::{Context, Poll},
};
use sd_p2p_proto::{decode, encode};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use thiserror::Error;
use sd_p2p::UnicastStream;
use sd_p2p::{Identity, RemoteIdentity, UnicastStream};
use uuid::Uuid;
#[derive(Debug, Error)]
pub enum TunnelError {
@ -18,25 +20,60 @@ pub enum TunnelError {
DiscriminatorReadError,
#[error("Invalid discriminator. Is this stream actually a tunnel?")]
InvalidDiscriminator,
#[error("Error sending library id: {0:?}")]
ErrorSendingLibraryId(io::Error),
#[error("Error receiving library id: {0:?}")]
ErrorReceivingLibraryId(decode::Error),
}
/// An encrypted tunnel between two libraries.
///
/// This sits on top of the existing node to node encryption provided by Quic.
///
/// It's primarily designed to avoid an attack where traffic flows:
/// node <-> attacker node <-> node
/// The attackers node can't break TLS but if they get in the middle they can present their own node identity to each side and then intercept library related traffic.
/// To avoid that we use this tunnel to encrypt all library related traffic so it can only be decoded by another instance of the same library.
#[derive(Debug)]
pub struct Tunnel {
stream: UnicastStream,
library_remote_id: RemoteIdentity,
library_id: Uuid,
}
impl Tunnel {
pub async fn initiator(mut stream: UnicastStream) -> Result<Self, TunnelError> {
/// Create a new tunnel.
///
/// This should be used by the node that initiated the request which this tunnel is used for.
pub async fn initiator(
mut stream: UnicastStream,
library_id: &Uuid,
library_identity: &Identity,
) -> Result<Self, TunnelError> {
stream
.write_all(&[b'T'])
.await
.map_err(|_| TunnelError::DiscriminatorWriteError)?;
// TODO: Do pairing + authentication
let mut buf = vec![];
encode::uuid(&mut buf, library_id);
stream
.write_all(&buf)
.await
.map_err(TunnelError::ErrorSendingLibraryId)?;
Ok(Self { stream })
// TODO: Do encryption tings
Ok(Self {
stream,
library_id: *library_id,
library_remote_id: library_identity.to_remote_identity(),
})
}
/// Create a new tunnel.
///
/// This should be used by the node that responded to the request which this tunnel is used for.
pub async fn responder(mut stream: UnicastStream) -> Result<Self, TunnelError> {
let discriminator = stream
.read_u8()
@ -46,9 +83,34 @@ impl Tunnel {
return Err(TunnelError::InvalidDiscriminator);
}
// TODO: Do pairing + authentication
let library_id = decode::uuid(&mut stream)
.await
.map_err(TunnelError::ErrorReceivingLibraryId)?;
Ok(Self { stream })
// TODO: Do encryption tings
Ok(Self {
// TODO: This is wrong but it's fine for now cause we don't use it.
// TODO: Will fix this in a follow up PR when I add encryption
library_remote_id: stream.remote_identity(),
stream,
library_id,
})
}
/// The the ID of the library being tunneled.
pub fn library_id(&self) -> Uuid {
self.library_id
}
/// Get the `RemoteIdentity` of the peer on the other end of the tunnel.
pub fn node_remote_identity(&self) -> RemoteIdentity {
self.stream.remote_identity()
}
/// Get the `RemoteIdentity` of the library instance on the other end of the tunnel.
pub fn library_remote_identity(&self) -> RemoteIdentity {
self.library_remote_id
}
}
@ -83,5 +145,3 @@ impl AsyncWrite for Tunnel {
Pin::new(&mut self.get_mut().stream).poll_shutdown(cx)
}
}
// TODO: Unit tests

View file

@ -21,7 +21,10 @@ pub trait OperationFactory {
fn get_clock(&self) -> &HLC;
fn get_instance(&self) -> Uuid;
fn new_op<TSyncId: SyncId>(&self, id: &TSyncId, data: CRDTOperationData) -> CRDTOperation {
fn new_op<TSyncId: SyncId>(&self, id: &TSyncId, data: CRDTOperationData) -> CRDTOperation
where
TSyncId::Model: crate::SyncModel,
{
let timestamp = self.get_clock().new_timestamp();
CRDTOperation {

View file

@ -2,7 +2,7 @@ use prisma_client_rust::ModelTypes;
use serde::{de::DeserializeOwned, Serialize};
pub trait SyncId: Serialize + DeserializeOwned {
type Model: SyncModel;
type Model;
}
pub trait SyncModel: ModelTypes {

View file

@ -55,3 +55,11 @@ Unimplemented
In an earlier version of the P2P system we had a method for sending sync messages to other nodes over the peer to peer connection, however this was removed during some refactoring of the sync system.
The code for it could be taken from [here](https://github.com/spacedriveapp/spacedrive/blob/aa72c083c2e5f6cf33f3c1fb66283e5fe0d1ba3b/core/src/p2p/pairing/mod.rs) and upgraded to account for changes to the sync and P2P system to bring back this functionality.
## Loading remote files
TODO - Loading file within location over P2P
## Sync preview media
https://linear.app/spacedriveapp/issue/ENG-910/sync-preview-media