From 0392c781d75d8f8a571ed43a61ce90e11c7d73d5 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Thu, 30 May 2024 22:07:28 +0800 Subject: [PATCH] Serve files over p2p (#2523) * serve files over p2p * include location instance id in sync * Fix P2P addressing? --------- Co-authored-by: Brendan Allan --- Cargo.lock | 2 + core/crates/prisma-helpers/src/lib.rs | 1 + core/crates/sync/src/backfill.rs | 14 +++ core/prisma/schema.prisma | 7 +- core/src/custom_uri/mod.rs | 79 ++++++++++---- core/src/library/manager/mod.rs | 2 +- core/src/p2p/manager.rs | 16 ++- core/src/p2p/operations/library.rs | 140 +++++++++++++++++++++++++ core/src/p2p/operations/mod.rs | 2 + core/src/p2p/protocol.rs | 67 +++++++++--- core/src/p2p/sync/mod.rs | 21 ++-- crates/p2p/crates/tunnel/Cargo.toml | 2 + crates/p2p/crates/tunnel/src/tunnel.rs | 76 ++++++++++++-- crates/sync/src/factory.rs | 5 +- crates/sync/src/model_traits.rs | 2 +- docs/developers/p2p/protocols.mdx | 8 ++ 16 files changed, 384 insertions(+), 60 deletions(-) create mode 100644 core/src/p2p/operations/library.rs diff --git a/Cargo.lock b/Cargo.lock index 35000242d..e0ddd75ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9546,8 +9546,10 @@ name = "sd-p2p-tunnel" version = "0.1.0" dependencies = [ "sd-p2p", + "sd-p2p-proto", "thiserror", "tokio", + "uuid", ] [[package]] diff --git a/core/crates/prisma-helpers/src/lib.rs b/core/crates/prisma-helpers/src/lib.rs index d562ab534..8f1fe20b9 100644 --- a/core/crates/prisma-helpers/src/lib.rs +++ b/core/crates/prisma-helpers/src/lib.rs @@ -112,6 +112,7 @@ file_path::select!(file_path_to_handle_custom_uri { instance: select { identity remote_identity + node_remote_identity } } }); diff --git a/core/crates/sync/src/backfill.rs b/core/crates/sync/src/backfill.rs index a383f8742..0bb6e6a62 100644 --- a/core/crates/sync/src/backfill.rs +++ b/core/crates/sync/src/backfill.rs @@ -70,6 +70,12 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta .find_many(vec![location::id::gt(cursor)]) .order_by(location::id::order(SortOrder::Asc)) .take(1000) + .include(location::include!({ + instance: select { + id + pub_id + } + })) .exec() }, |location| location.id, @@ -108,6 +114,14 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta ), option_sync_entry!(l.hidden, hidden), option_sync_entry!(l.date_created, date_created), + option_sync_entry!( + l.instance.map(|i| { + prisma_sync::instance::SyncId { + pub_id: i.pub_id, + } + }), + instance + ), ], ), ) diff --git a/core/prisma/schema.prisma b/core/prisma/schema.prisma index 821a34a52..35358a221 100644 --- a/core/prisma/schema.prisma +++ b/core/prisma/schema.prisma @@ -68,7 +68,7 @@ model Node { // represents a single `.db` file (SQLite DB) that is paired to the current library. // A `LibraryInstance` is always owned by a single `Node` but it's possible for that node to change (or two to be owned by a single node). -/// @local +/// @local(id: pub_id) model Instance { id Int @id @default(autoincrement()) // This is is NOT globally unique pub_id Bytes @unique // This UUID is meaningless and exists soley cause the `uhlc::ID` must be 16-bit. Really this should be derived from the `identity` field. @@ -148,8 +148,7 @@ model Location { scan_state Int @default(0) // Enum: sd_core::location::ScanState - /// @local - // this is just a client side cache which is annoying but oh well (@brendan) + // this should just be a local-only cache but it's too much effort to broadcast online locations rn (@brendan) instance_id Int? instance Instance? @relation(fields: [instance_id], references: [id], onDelete: SetNull) @@ -627,7 +626,7 @@ model IndexerRule { id Int @id @default(autoincrement()) pub_id Bytes @unique - name String? @unique + name String? @unique default Boolean? rules_per_kind Bytes? date_created DateTime? diff --git a/core/src/custom_uri/mod.rs b/core/src/custom_uri/mod.rs index 37ad328f4..a57184990 100644 --- a/core/src/custom_uri/mod.rs +++ b/core/src/custom_uri/mod.rs @@ -2,18 +2,23 @@ use crate::{ api::{utils::InvalidateOperationEvent, CoreEvent}, library::Library, object::media::old_thumbnail::WEBP_EXTENSION, - p2p::operations, + p2p::operations::{self, request_file}, util::InfallibleResponse, Node, }; +use async_stream::stream; +use bytes::Bytes; +use mpsc_to_async_write::MpscToAsyncWrite; use sd_core_file_path_helper::IsolatedFilePathData; use sd_core_prisma_helpers::file_path_to_handle_custom_uri; use sd_file_ext::text::is_text; use sd_p2p::{RemoteIdentity, P2P}; +use sd_p2p_block::Range; use sd_prisma::prisma::{file_path, location}; use sd_utils::db::maybe_missing; +use tokio_util::sync::PollSender; use std::{ cmp::min, @@ -26,7 +31,7 @@ use std::{ }; use axum::{ - body::{self, Body, BoxBody, Full}, + body::{self, Body, BoxBody, Full, StreamBody}, extract::{self, State}, http::{HeaderMap, HeaderValue, Request, Response, StatusCode}, middleware, @@ -68,7 +73,11 @@ pub enum ServeFrom { /// Serve from the local filesystem Local, /// Serve from a specific instance - Remote(RemoteIdentity), + Remote { + library_identity: RemoteIdentity, + node_identity: RemoteIdentity, + library: Arc, + }, } #[derive(Clone)] @@ -176,17 +185,29 @@ async fn get_or_init_lru_entry( let path = Path::new(path) .join(IsolatedFilePathData::try_from((location_id, &file_path)).map_err(not_found)?); - let identity = + let library_identity = RemoteIdentity::from_bytes(&instance.remote_identity).map_err(internal_server_error)?; + let node_identity = RemoteIdentity::from_bytes( + instance + .node_remote_identity + .as_ref() + .expect("node_remote_identity is required"), + ) + .map_err(internal_server_error)?; + let lru_entry = CacheValue { name: path, ext: maybe_missing(file_path.extension, "extension").map_err(not_found)?, file_path_pub_id: Uuid::from_slice(&file_path.pub_id).map_err(internal_server_error)?, - serve_from: if identity == library.identity.to_remote_identity() { + serve_from: if library_identity == library.identity.to_remote_identity() { ServeFrom::Local } else { - ServeFrom::Remote(identity) + ServeFrom::Remote { + library_identity, + node_identity, + library: library.clone(), + } }, }; @@ -240,19 +261,16 @@ pub fn base_router() -> Router { .route( "/file/:lib_id/:loc_id/:path_id", get( - |State(state): State, - path: ExtractedPath, - mut request: Request| async move { - let part_parts = path.0.clone(); + |State(state): State, path: ExtractedPath, request: Request| async move { let ( CacheValue { name: file_path_full_path, ext: extension, - file_path_pub_id: _file_path_pub_id, + file_path_pub_id, serve_from, .. }, - _library, + library, ) = get_or_init_lru_entry(&state, path).await?; match serve_from { @@ -288,18 +306,37 @@ pub fn base_router() -> Router { serve_file(file, Ok(metadata), request.into_parts().0, resp).await } - ServeFrom::Remote(identity) => { - *request.uri_mut() = - format!("/file/{}/{}/{}", part_parts.0, part_parts.1, part_parts.2) - .parse() - .expect("url was validated by Axum"); + ServeFrom::Remote { + library_identity: _, + node_identity, + library, + } => { + // TODO: Support `Range` requests and `ETag` headers - Ok(request_to_remote_node( + let (tx, mut rx) = tokio::sync::mpsc::channel::>(150); + request_file( state.node.p2p.p2p.clone(), - identity, - request, + node_identity, + &library.id, + &library.identity, + file_path_pub_id, + Range::Full, + MpscToAsyncWrite::new(PollSender::new(tx)), ) - .await) + .await + .map_err(|err| { + error!("Error requesting file {file_path_pub_id:?} from node {:?}: {err:?}", library.identity.to_remote_identity()); + internal_server_error(()) + })?; + + // TODO: Content Type + Ok(InfallibleResponse::builder().status(StatusCode::OK).body( + body::boxed(StreamBody::new(stream! { + while let Some(item) = rx.recv().await { + yield item; + } + })), + )) } } }, diff --git a/core/src/library/manager/mod.rs b/core/src/library/manager/mod.rs index e81085921..b4ad0d1b0 100644 --- a/core/src/library/manager/mod.rs +++ b/core/src/library/manager/mod.rs @@ -732,7 +732,7 @@ async fn sync_rx_actor( InvalidateOperationEvent::all(), )), SyncMessage::Created => { - p2p::sync::originator(library.id, &library.sync, &node.p2p).await + p2p::sync::originator(library.clone(), &library.sync, &node.p2p).await } } } diff --git a/core/src/p2p/manager.rs b/core/src/p2p/manager.rs index b14e2ea98..b9b6f5c02 100644 --- a/core/src/p2p/manager.rs +++ b/core/src/p2p/manager.rs @@ -387,12 +387,13 @@ async fn start( error!("Failed to handle Spacedrop request"); } - Header::Sync(library_id) => { + Header::Sync => { let Ok(mut tunnel) = Tunnel::responder(stream).await.map_err(|err| { error!("Failed `Tunnel::responder`: {}", err); }) else { return; }; + let library_id = tunnel.library_id(); let Ok(msg) = SyncMessage::from_stream(&mut tunnel).await.map_err(|err| { error!("Failed `SyncMessage::from_stream`: {}", err); @@ -432,6 +433,19 @@ async fn start( error!("Failed to handling rspc request with '{remote}': {err:?}"); } + Header::LibraryFile { + file_path_id, + range, + } => { + let remote = stream.remote_identity(); + let Err(err) = + operations::library::receiver(stream, file_path_id, range, &node).await + else { + return; + }; + + error!("Failed to handling library file request with {remote:?} for {file_path_id}: {err:?}"); + } }; }); } diff --git a/core/src/p2p/operations/library.rs b/core/src/p2p/operations/library.rs new file mode 100644 index 000000000..b8fb14999 --- /dev/null +++ b/core/src/p2p/operations/library.rs @@ -0,0 +1,140 @@ +use std::{ + error::Error, + path::Path, + sync::{atomic::AtomicBool, Arc}, +}; + +use sd_core_file_path_helper::IsolatedFilePathData; +use sd_core_prisma_helpers::file_path_to_handle_p2p_serve_file; +use sd_p2p::{Identity, RemoteIdentity, UnicastStream, P2P}; +use sd_p2p_block::{BlockSize, Range, SpaceblockRequest, SpaceblockRequests, Transfer}; +use sd_prisma::prisma::file_path; +use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader}, +}; +use tracing::debug; +use uuid::Uuid; + +use crate::{p2p::Header, Node}; + +/// Request a file from a remote library +#[allow(unused)] +pub async fn request_file( + p2p: Arc, + identity: RemoteIdentity, + library_id: &Uuid, + library_identity: &Identity, + file_path_id: Uuid, + range: Range, + output: impl AsyncWrite + Unpin, +) -> Result<(), Box> { + let peer = p2p.peers().get(&identity).ok_or("Peer offline")?.clone(); + let mut stream = peer.new_stream().await?; + + stream + .write_all( + &Header::LibraryFile { + file_path_id, + range: range.clone(), + } + .to_bytes(), + ) + .await?; + + let mut stream = sd_p2p_tunnel::Tunnel::initiator(stream, library_id, library_identity).await?; + + let block_size = BlockSize::from_stream(&mut stream).await?; + let size = stream.read_u64_le().await?; + + Transfer::new( + &SpaceblockRequests { + id: Uuid::new_v4(), + block_size, + requests: vec![SpaceblockRequest { + name: "_".to_string(), + size, + range, + }], + }, + |percent| debug!("P2P receiving file path {file_path_id:?} - progress {percent}%"), + &Arc::new(AtomicBool::new(false)), + ) + .receive(&mut stream, output) + .await; + + Ok(()) +} + +pub(crate) async fn receiver( + stream: UnicastStream, + file_path_id: Uuid, + range: Range, + node: &Arc, +) -> Result<(), Box> { + debug!( + "Received library request from peer '{}'", + stream.remote_identity() + ); + + // The tunnel takes care of authentication and encrypts all traffic to the library to be certain we are talking to a node with the library. + let mut stream = sd_p2p_tunnel::Tunnel::responder(stream).await?; + + let library = node + .libraries + .get_library(&stream.library_id()) + .await + .ok_or_else(|| format!("Library not found: {:?}", stream.library_id()))?; + + let file_path = library + .db + .file_path() + .find_unique(file_path::pub_id::equals(file_path_id.as_bytes().to_vec())) + .select(file_path_to_handle_p2p_serve_file::select()) + .exec() + .await? + .ok_or_else(|| { + format!( + "File path {file_path_id:?} not found in {:?}", + stream.library_id() + ) + })?; + + let location = file_path.location.as_ref().expect("included in query"); + let location_path = location.path.as_ref().expect("included in query"); + let path = + Path::new(location_path).join(IsolatedFilePathData::try_from((location.id, &file_path))?); + + debug!( + "Serving path {path:?} for library {:?} over P2P", + library.id + ); + + let file = File::open(&path).await?; + + let metadata = file.metadata().await?; + let block_size = BlockSize::from_file_size(metadata.len()); + + stream.write_all(&block_size.to_bytes()).await?; + stream.write_all(&metadata.len().to_le_bytes()).await?; + + let file = BufReader::new(file); + Transfer::new( + &SpaceblockRequests { + id: Uuid::new_v4(), + block_size, + requests: vec![SpaceblockRequest { + name: "_".into(), + size: metadata.len(), + range, + }], + }, + |percent| debug!("P2P loading file path {file_path_id:?} - progress {percent}%"), + // TODO: Properly handle cancellation with webview + &Arc::new(AtomicBool::new(false)), + ) + .send(&mut stream, file) + .await?; + + Ok(()) +} diff --git a/core/src/p2p/operations/mod.rs b/core/src/p2p/operations/mod.rs index 02d01766d..32ef65e29 100644 --- a/core/src/p2p/operations/mod.rs +++ b/core/src/p2p/operations/mod.rs @@ -1,6 +1,8 @@ +pub mod library; pub mod ping; pub mod rspc; pub mod spacedrop; +pub use library::request_file; pub use rspc::remote_rspc; pub use spacedrop::spacedrop; diff --git a/core/src/p2p/protocol.rs b/core/src/p2p/protocol.rs index a96d60c9b..e6c40e3d0 100644 --- a/core/src/p2p/protocol.rs +++ b/core/src/p2p/protocol.rs @@ -1,4 +1,4 @@ -use sd_p2p_block::{SpaceblockRequests, SpaceblockRequestsError}; +use sd_p2p_block::{Range, SpaceblockRequests, SpaceblockRequestsError}; use sd_p2p_proto::{decode, encode}; use thiserror::Error; use tokio::io::{AsyncRead, AsyncReadExt}; @@ -7,12 +7,20 @@ use uuid::Uuid; /// TODO #[derive(Debug, PartialEq, Eq)] pub enum Header { - // TODO: Split out cause this is a broadcast + /// Basic pin protocol for demonstrating the P2P system Ping, + /// Spacedrop file sending Spacedrop(SpaceblockRequests), - Sync(Uuid), + /// Used for sending sync messages between nodes. + Sync, // A HTTP server used for rspc requests and streaming files RspcRemote, + // Request a file within a library + // We don't include a library ID here as it's taken care of by `sd_p2p_tunnel::Tunnel`. + LibraryFile { + file_path_id: Uuid, + range: Range, + }, } #[derive(Debug, Error)] @@ -23,8 +31,12 @@ pub enum HeaderError { DiscriminatorInvalid(u8), #[error("error reading spacedrop request: {0}")] SpacedropRequest(#[from] SpaceblockRequestsError), - #[error("error reading sync request: {0}")] - SyncRequest(decode::Error), + #[error("error with library file decode '{0}'")] + LibraryFileDecodeError(decode::Error), + #[error("error with library file io '{0}'")] + LibraryFileIoError(std::io::Error), + #[error("invalid range discriminator for library file req '{0}'")] + LibraryDiscriminatorInvalid(u8), } impl Header { @@ -39,12 +51,32 @@ impl Header { SpaceblockRequests::from_stream(stream).await?, )), 1 => Ok(Self::Ping), - 3 => Ok(Self::Sync( - decode::uuid(stream) - .await - .map_err(HeaderError::SyncRequest)?, - )), + 3 => Ok(Self::Sync), 5 => Ok(Self::RspcRemote), + 6 => Ok(Self::LibraryFile { + file_path_id: decode::uuid(stream) + .await + .map_err(HeaderError::LibraryFileDecodeError)?, + range: match stream + .read_u8() + .await + .map_err(HeaderError::LibraryFileIoError)? + { + 0 => Range::Full, + 1 => { + let start = stream + .read_u64_le() + .await + .map_err(HeaderError::LibraryFileIoError)?; + let end = stream + .read_u64_le() + .await + .map_err(HeaderError::LibraryFileIoError)?; + Range::Partial(start..end) + } + d => return Err(HeaderError::LibraryDiscriminatorInvalid(d)), + }, + }), d => Err(HeaderError::DiscriminatorInvalid(d)), } } @@ -57,12 +89,17 @@ impl Header { bytes } Self::Ping => vec![1], - Self::Sync(uuid) => { - let mut bytes = vec![3]; - encode::uuid(&mut bytes, uuid); - bytes - } + Self::Sync => vec![3], Self::RspcRemote => vec![5], + Self::LibraryFile { + file_path_id, + range, + } => { + let mut buf = vec![6]; + encode::uuid(&mut buf, file_path_id); + buf.extend_from_slice(&range.to_bytes()); + buf + } } } } diff --git a/core/src/p2p/sync/mod.rs b/core/src/p2p/sync/mod.rs index e4cc31f07..5cbb0b0e3 100644 --- a/core/src/p2p/sync/mod.rs +++ b/core/src/p2p/sync/mod.rs @@ -12,7 +12,6 @@ use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tracing::*; -use uuid::Uuid; use super::P2PManager; @@ -58,6 +57,7 @@ mod originator { #[tokio::test] async fn test() { use sd_sync::CRDTOperation; + use uuid::Uuid; { let original = Operations(CompressedCRDTOperations::new(vec![])); @@ -84,28 +84,33 @@ mod originator { } /// REMEMBER: This only syncs one direction! - pub async fn run(library_id: Uuid, sync: &Arc, p2p: &Arc) { - for (remote_identity, peer) in p2p.get_library_instances(&library_id) { + pub async fn run( + library: Arc, + sync: &Arc, + p2p: &Arc, + ) { + for (remote_identity, peer) in p2p.get_library_instances(&library.id) { if !peer.is_connected() { continue; }; let sync = sync.clone(); + let library = library.clone(); tokio::spawn(async move { debug!( - "Alerting peer '{remote_identity:?}' of new sync events for library '{library_id:?}'" + "Alerting peer {remote_identity:?} of new sync events for library {:?}", + library.id ); let mut stream = peer.new_stream().await.unwrap(); - stream - .write_all(&Header::Sync(library_id).to_bytes()) + stream.write_all(&Header::Sync.to_bytes()).await.unwrap(); + + let mut tunnel = Tunnel::initiator(stream, &library.id, &library.identity) .await .unwrap(); - let mut tunnel = Tunnel::initiator(stream).await.unwrap(); - tunnel .write_all(&SyncMessage::NewOperations.to_bytes()) .await diff --git a/crates/p2p/crates/tunnel/Cargo.toml b/crates/p2p/crates/tunnel/Cargo.toml index ff37c4c8b..5c61b3017 100644 --- a/crates/p2p/crates/tunnel/Cargo.toml +++ b/crates/p2p/crates/tunnel/Cargo.toml @@ -9,6 +9,8 @@ repository.workspace = true [dependencies] # Spacedrive Sub-crates sd-p2p = { path = "../../" } +sd-p2p-proto = { path = "../proto" } tokio = { workspace = true, features = ["io-util"] } thiserror = { workspace = true } +uuid = { workspace = true, features = ["v4"] } diff --git a/crates/p2p/crates/tunnel/src/tunnel.rs b/crates/p2p/crates/tunnel/src/tunnel.rs index ae559c158..be8ea6554 100644 --- a/crates/p2p/crates/tunnel/src/tunnel.rs +++ b/crates/p2p/crates/tunnel/src/tunnel.rs @@ -4,11 +4,13 @@ use std::{ task::{Context, Poll}, }; +use sd_p2p_proto::{decode, encode}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use thiserror::Error; -use sd_p2p::UnicastStream; +use sd_p2p::{Identity, RemoteIdentity, UnicastStream}; +use uuid::Uuid; #[derive(Debug, Error)] pub enum TunnelError { @@ -18,25 +20,60 @@ pub enum TunnelError { DiscriminatorReadError, #[error("Invalid discriminator. Is this stream actually a tunnel?")] InvalidDiscriminator, + #[error("Error sending library id: {0:?}")] + ErrorSendingLibraryId(io::Error), + #[error("Error receiving library id: {0:?}")] + ErrorReceivingLibraryId(decode::Error), } +/// An encrypted tunnel between two libraries. +/// +/// This sits on top of the existing node to node encryption provided by Quic. +/// +/// It's primarily designed to avoid an attack where traffic flows: +/// node <-> attacker node <-> node +/// The attackers node can't break TLS but if they get in the middle they can present their own node identity to each side and then intercept library related traffic. +/// To avoid that we use this tunnel to encrypt all library related traffic so it can only be decoded by another instance of the same library. #[derive(Debug)] pub struct Tunnel { stream: UnicastStream, + library_remote_id: RemoteIdentity, + library_id: Uuid, } impl Tunnel { - pub async fn initiator(mut stream: UnicastStream) -> Result { + /// Create a new tunnel. + /// + /// This should be used by the node that initiated the request which this tunnel is used for. + pub async fn initiator( + mut stream: UnicastStream, + library_id: &Uuid, + library_identity: &Identity, + ) -> Result { stream .write_all(&[b'T']) .await .map_err(|_| TunnelError::DiscriminatorWriteError)?; - // TODO: Do pairing + authentication + let mut buf = vec![]; + encode::uuid(&mut buf, library_id); + stream + .write_all(&buf) + .await + .map_err(TunnelError::ErrorSendingLibraryId)?; - Ok(Self { stream }) + // TODO: Do encryption tings + + Ok(Self { + stream, + library_id: *library_id, + library_remote_id: library_identity.to_remote_identity(), + }) } + /// Create a new tunnel. + /// + /// This should be used by the node that responded to the request which this tunnel is used for. pub async fn responder(mut stream: UnicastStream) -> Result { let discriminator = stream .read_u8() @@ -46,9 +83,34 @@ impl Tunnel { return Err(TunnelError::InvalidDiscriminator); } - // TODO: Do pairing + authentication + let library_id = decode::uuid(&mut stream) + .await + .map_err(TunnelError::ErrorReceivingLibraryId)?; - Ok(Self { stream }) + // TODO: Do encryption tings + + Ok(Self { + // TODO: This is wrong but it's fine for now cause we don't use it. + // TODO: Will fix this in a follow up PR when I add encryption + library_remote_id: stream.remote_identity(), + stream, + library_id, + }) + } + + /// The the ID of the library being tunneled. + pub fn library_id(&self) -> Uuid { + self.library_id + } + + /// Get the `RemoteIdentity` of the peer on the other end of the tunnel. + pub fn node_remote_identity(&self) -> RemoteIdentity { + self.stream.remote_identity() + } + + /// Get the `RemoteIdentity` of the library instance on the other end of the tunnel. + pub fn library_remote_identity(&self) -> RemoteIdentity { + self.library_remote_id } } @@ -83,5 +145,3 @@ impl AsyncWrite for Tunnel { Pin::new(&mut self.get_mut().stream).poll_shutdown(cx) } } - -// TODO: Unit tests diff --git a/crates/sync/src/factory.rs b/crates/sync/src/factory.rs index 6a7f41b10..80966b477 100644 --- a/crates/sync/src/factory.rs +++ b/crates/sync/src/factory.rs @@ -21,7 +21,10 @@ pub trait OperationFactory { fn get_clock(&self) -> &HLC; fn get_instance(&self) -> Uuid; - fn new_op(&self, id: &TSyncId, data: CRDTOperationData) -> CRDTOperation { + fn new_op(&self, id: &TSyncId, data: CRDTOperationData) -> CRDTOperation + where + TSyncId::Model: crate::SyncModel, + { let timestamp = self.get_clock().new_timestamp(); CRDTOperation { diff --git a/crates/sync/src/model_traits.rs b/crates/sync/src/model_traits.rs index cd7d2d73f..b0a063f2e 100644 --- a/crates/sync/src/model_traits.rs +++ b/crates/sync/src/model_traits.rs @@ -2,7 +2,7 @@ use prisma_client_rust::ModelTypes; use serde::{de::DeserializeOwned, Serialize}; pub trait SyncId: Serialize + DeserializeOwned { - type Model: SyncModel; + type Model; } pub trait SyncModel: ModelTypes { diff --git a/docs/developers/p2p/protocols.mdx b/docs/developers/p2p/protocols.mdx index 90454544a..c3ff8eba9 100644 --- a/docs/developers/p2p/protocols.mdx +++ b/docs/developers/p2p/protocols.mdx @@ -55,3 +55,11 @@ Unimplemented In an earlier version of the P2P system we had a method for sending sync messages to other nodes over the peer to peer connection, however this was removed during some refactoring of the sync system. The code for it could be taken from [here](https://github.com/spacedriveapp/spacedrive/blob/aa72c083c2e5f6cf33f3c1fb66283e5fe0d1ba3b/core/src/p2p/pairing/mod.rs) and upgraded to account for changes to the sync and P2P system to bring back this functionality. + +## Loading remote files + +TODO - Loading file within location over P2P + +## Sync preview media + +https://linear.app/spacedriveapp/issue/ENG-910/sync-preview-media