[ENG-732] Pairing by library (#943)

* library keypair

* allow opening stream to non-connected node + more

* library identity

* fix types

* fix maybe undefined

* don't forgor migrations file

* library manager inside p2p manager

* rename

* `NodeInformation` struct

* node info exchange

* fill in info

* streamify tunnel

* use tunnel for p2p events

* libp2p is annoying + stop leaking private key's

* Clippy cleanup
This commit is contained in:
Oscar Beaumont 2023-06-19 07:13:30 +02:00 committed by GitHub
parent 6975ebb664
commit 0f7a669e5a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
44 changed files with 1355 additions and 468 deletions

220
Cargo.lock generated
View file

@ -605,6 +605,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce"
[[package]]
name = "base16ct"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf"
[[package]]
name = "base36"
version = "0.0.1"
@ -1485,7 +1491,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf4c2f4e1afd912bc40bfd6fed5d9dc1f288e0ba01bfcc835cc5bc3eb13efe15"
dependencies = [
"generic-array",
"rand_core 0.6.4",
"subtle",
"zeroize",
]
[[package]]
@ -1774,7 +1782,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1a467a65c5e759bce6e65eaf91cc29f466cdc57cb65777bd646872a8a1fd4de"
dependencies = [
"const-oid",
"pem-rfc7468",
"pem-rfc7468 0.6.0",
"zeroize",
]
[[package]]
name = "der"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56acb310e15652100da43d130af8d97b509e95af61aab1c5a7939ef24337ee17"
dependencies = [
"const-oid",
"pem-rfc7468 0.7.0",
"zeroize",
]
@ -1887,6 +1906,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer 0.10.4",
"const-oid",
"crypto-common",
"subtle",
]
@ -2011,10 +2031,24 @@ version = "0.14.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c"
dependencies = [
"der",
"elliptic-curve",
"rfc6979",
"signature",
"der 0.6.1",
"elliptic-curve 0.12.3",
"rfc6979 0.3.1",
"signature 1.6.4",
]
[[package]]
name = "ecdsa"
version = "0.16.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0997c976637b606099b9985693efa3581e84e41f5c11ba5255f88711058ad428"
dependencies = [
"der 0.7.6",
"digest 0.10.7",
"elliptic-curve 0.13.5",
"rfc6979 0.4.0",
"signature 2.1.0",
"spki 0.7.2",
]
[[package]]
@ -2023,7 +2057,7 @@ version = "1.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91cff35c70bba8a626e3185d8cd48cc11b5437e1a5bcd15b9b5fa3c64b6dfee7"
dependencies = [
"signature",
"signature 1.6.4",
]
[[package]]
@ -2052,18 +2086,39 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7bb888ab5300a19b8e5bceef25ac745ad065f3c9f7efc6de1b91958110891d3"
dependencies = [
"base16ct",
"base16ct 0.1.1",
"crypto-bigint 0.4.9",
"der",
"der 0.6.1",
"digest 0.10.7",
"ff",
"ff 0.12.1",
"generic-array",
"group",
"group 0.12.1",
"hkdf 0.12.3",
"pem-rfc7468",
"pkcs8",
"pem-rfc7468 0.6.0",
"pkcs8 0.9.0",
"rand_core 0.6.4",
"sec1",
"sec1 0.3.0",
"subtle",
"zeroize",
]
[[package]]
name = "elliptic-curve"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "968405c8fdc9b3bf4df0a6638858cc0b52462836ab6b1c87377785dd09cf1c0b"
dependencies = [
"base16ct 0.2.0",
"crypto-bigint 0.5.2",
"digest 0.10.7",
"ff 0.13.0",
"generic-array",
"group 0.13.0",
"hkdf 0.12.3",
"pem-rfc7468 0.7.0",
"pkcs8 0.10.2",
"rand_core 0.6.4",
"sec1 0.7.2",
"subtle",
"zeroize",
]
@ -2281,6 +2336,16 @@ dependencies = [
"subtle",
]
[[package]]
name = "ff"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ded41244b729663b1e574f1b4fb731469f69f79c17667b5d776b16cda0479449"
dependencies = [
"rand_core 0.6.4",
"subtle",
]
[[package]]
name = "ffmpeg-sys-next"
version = "6.0.1"
@ -2672,6 +2737,7 @@ checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
dependencies = [
"typenum",
"version_check",
"zeroize",
]
[[package]]
@ -2856,7 +2922,18 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7"
dependencies = [
"ff",
"ff 0.12.1",
"rand_core 0.6.4",
"subtle",
]
[[package]]
name = "group"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0f9ef7462f7c099f518d754361858f86d8a07af53ba9af0fe635bbccb151a63"
dependencies = [
"ff 0.13.0",
"rand_core 0.6.4",
"subtle",
]
@ -5243,8 +5320,8 @@ version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594"
dependencies = [
"ecdsa",
"elliptic-curve",
"ecdsa 0.14.8",
"elliptic-curve 0.12.3",
"sha2 0.10.6",
]
@ -5254,8 +5331,20 @@ version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc8c5bf642dde52bb9e87c0ecd8ca5a76faac2eeed98dedb7c717997e1080aa"
dependencies = [
"ecdsa",
"elliptic-curve",
"ecdsa 0.14.8",
"elliptic-curve 0.12.3",
"sha2 0.10.6",
]
[[package]]
name = "p384"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70786f51bcc69f6a4c0360e063a4cac5419ef7c5cd5b3c99ad70f3be5ba79209"
dependencies = [
"ecdsa 0.16.7",
"elliptic-curve 0.13.5",
"primeorder",
"sha2 0.10.6",
]
@ -5407,6 +5496,15 @@ dependencies = [
"base64ct",
]
[[package]]
name = "pem-rfc7468"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412"
dependencies = [
"base64ct",
]
[[package]]
name = "percent-encoding"
version = "2.2.0"
@ -5603,8 +5701,18 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba"
dependencies = [
"der",
"spki",
"der 0.6.1",
"spki 0.6.0",
]
[[package]]
name = "pkcs8"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7"
dependencies = [
"der 0.7.6",
"spki 0.7.2",
]
[[package]]
@ -5720,6 +5828,15 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c"
[[package]]
name = "primeorder"
version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c2fcef82c0ec6eefcc179b978446c399b3cdf73c392c35604e399eee6df1ee3"
dependencies = [
"elliptic-curve 0.13.5",
]
[[package]]
name = "prisma-cli"
version = "0.1.0"
@ -6531,6 +6648,16 @@ dependencies = [
"zeroize",
]
[[package]]
name = "rfc6979"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8dd2a808d456c4a54e300a23e9f5a67e122c3024119acbfd73e3bf664491cb2"
dependencies = [
"hmac 0.12.1",
"subtle",
]
[[package]]
name = "rfd"
version = "0.10.0"
@ -7089,11 +7216,14 @@ name = "sd-p2p"
version = "0.1.0"
dependencies = [
"arc-swap",
"ed25519-dalek",
"flume",
"if-watch",
"libp2p",
"libp2p-quic",
"mdns-sd",
"p384 0.13.0",
"rand_core 0.5.1",
"rmp-serde",
"serde",
"specta",
@ -7159,10 +7289,24 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928"
dependencies = [
"base16ct",
"der",
"base16ct 0.1.1",
"der 0.6.1",
"generic-array",
"pkcs8",
"pkcs8 0.9.0",
"subtle",
"zeroize",
]
[[package]]
name = "sec1"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0aec48e813d6b90b15f0b8948af3c63483992dee44c03e9930b3eebdabe046e"
dependencies = [
"base16ct 0.2.0",
"der 0.7.6",
"generic-array",
"pkcs8 0.10.2",
"subtle",
"zeroize",
]
@ -7528,6 +7672,16 @@ dependencies = [
"rand_core 0.6.4",
]
[[package]]
name = "signature"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e1788eed21689f9cf370582dfc467ef36ed9c707f073528ddafa8d83e3b8500"
dependencies = [
"digest 0.10.7",
"rand_core 0.6.4",
]
[[package]]
name = "simd-adler32"
version = "0.3.5"
@ -7713,7 +7867,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67cf02bbac7a337dc36e4f5a693db6c21e7863f45070f7064577eb4367a3212b"
dependencies = [
"base64ct",
"der",
"der 0.6.1",
]
[[package]]
name = "spki"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d1e996ef02c474957d681f1b05213dfb0abab947b446a62d37770b23500184a"
dependencies = [
"base64ct",
"der 0.7.6",
]
[[package]]
@ -9486,23 +9650,23 @@ dependencies = [
"ccm",
"curve25519-dalek 3.2.0",
"der-parser 8.2.0",
"elliptic-curve",
"elliptic-curve 0.12.3",
"hkdf 0.12.3",
"hmac 0.12.1",
"log",
"oid-registry 0.6.1",
"p256",
"p384",
"p384 0.11.2",
"rand 0.8.5",
"rand_core 0.6.4",
"rcgen 0.9.3",
"ring",
"rustls 0.19.1",
"sec1",
"sec1 0.3.0",
"serde",
"sha1",
"sha2 0.10.6",
"signature",
"signature 1.6.4",
"subtle",
"thiserror",
"tokio",

View file

@ -56,7 +56,7 @@ pub fn tauri_error_plugin<R: Runtime>(err: NodeError) -> TauriPlugin<R> {
tauri::plugin::Builder::new("spacedrive")
.js_init_script(format!(
r#"window.__SD_ERROR__ = `{}`;"#,
err.to_string().replace("`", "\"")
err.to_string().replace('`', "\"")
))
.build()
}

View file

@ -24,7 +24,10 @@ const LibraryGeneralSettingsScreen = ({
const form = useZodForm({
schema,
defaultValues: { name: library.config.name, description: library.config.description }
defaultValues: {
name: library.config.name,
description: library.config.description || undefined
}
});
const { mutate: editLibrary } = useBridgeMutation('library.edit');

View file

@ -0,0 +1,17 @@
-- RedefineTables
PRAGMA foreign_keys=OFF;
CREATE TABLE "new_node" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"pub_id" BLOB NOT NULL,
"name" TEXT NOT NULL,
"platform" INTEGER NOT NULL,
"date_created" DATETIME NOT NULL,
"identity" BLOB,
"node_peer_id" TEXT
);
INSERT INTO "new_node" ("date_created", "id", "name", "platform", "pub_id") SELECT "date_created", "id", "name", "platform", "pub_id" FROM "node";
DROP TABLE "node";
ALTER TABLE "new_node" RENAME TO "node";
CREATE UNIQUE INDEX "node_pub_id_key" ON "node"("pub_id");
PRAGMA foreign_key_check;
PRAGMA foreign_keys=ON;

View file

@ -52,11 +52,10 @@ model Node {
pub_id Bytes @unique
name String
// Enum: sd_core::node::Platform
platform Int @default(0)
// version String?
// last_seen DateTime @default(now())
// timezone String?
date_created DateTime @default(now())
platform Int
date_created DateTime
identity Bytes? // TODO: Change to required field in future
node_peer_id String? // TODO: Remove as part of - https://linear.app/spacedriveapp/issue/ENG-757/p2p-library-portability
jobs Job[]
Location Location[]

View file

@ -115,11 +115,11 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
match groups.entry(group_key) {
// Create new job group with metadata
Entry::Vacant(e) => {
let id = job.parent_id.clone().unwrap_or(job.id.clone());
let id = job.parent_id.unwrap_or(job.id);
let group = JobGroup {
id: id.to_string(),
action: action_name.clone(),
status: job.status.clone(),
status: job.status,
jobs: vec![report.clone()],
created_at: job.created_at.unwrap_or(Utc::now()),
};
@ -134,8 +134,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
}
}
let mut groups_vec: Vec<JobGroup> =
groups.into_iter().map(|(_, v)| v).collect();
let mut groups_vec: Vec<JobGroup> = groups.into_values().collect();
groups_vec.sort_by(|a, b| b.created_at.cmp(&a.created_at));
// Update the index after sorting the groups
@ -154,7 +153,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
})
.procedure("isActive", {
R.with2(library()).query(|(ctx, _), _: ()| async move {
Ok(ctx.jobs.get_running_reports().await.len() > 0)
Ok(!ctx.jobs.get_running_reports().await.is_empty())
})
})
.procedure("clear", {

View file

@ -1,6 +1,7 @@
use crate::{
library::LibraryConfig,
prisma::statistics,
util::MaybeUndefined,
volume::{get_volumes, save_volume},
};
@ -97,10 +98,10 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
let new_library = ctx
.library_manager
.create(LibraryConfig {
name: args.name.to_string(),
..Default::default()
})
.create(
LibraryConfig::new(args.name.to_string(), ctx.config.get().await.id),
ctx.config.get().await,
)
.await?;
Ok(new_library)
@ -111,7 +112,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
pub struct EditLibraryArgs {
pub id: Uuid,
pub name: Option<String>,
pub description: Option<String>,
pub description: MaybeUndefined<String>,
}
R.mutation(|ctx, args: EditLibraryArgs| async move {

View file

@ -28,6 +28,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
"error updating config".into(),
)
})
.map(|_| ())
})
})
}

View file

@ -7,7 +7,7 @@ use uuid::Uuid;
use crate::p2p::P2PEvent;
use super::{Ctx, R};
use super::{utils::library, Ctx, R};
pub(crate) fn mount() -> AlphaRouter<Ctx> {
R.router()
@ -74,4 +74,8 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
})
})
})
.procedure("pair", {
R.with2(library())
.mutation(|(ctx, lib), id: PeerId| async move { ctx.p2p.pair(id, lib) })
})
}

View file

@ -167,7 +167,7 @@ enum ObjectHiddenFilter {
}
impl ObjectHiddenFilter {
fn to_param(self) -> Option<object::WhereParam> {
fn to_param(&self) -> Option<object::WhereParam> {
match self {
ObjectHiddenFilter::Exclude => Some(object::hidden::not(Some(true))),
ObjectHiddenFilter::Include => None,

View file

@ -17,7 +17,7 @@ use std::{
};
use thiserror::Error;
use tokio::{fs, sync::broadcast};
use tracing::{debug, error, info, warn};
use tracing::{error, info, warn};
use tracing_appender::{
non_blocking::{NonBlocking, WorkerGuard},
rolling::{RollingFileAppender, Rotation},
@ -42,7 +42,6 @@ pub struct NodeContext {
pub jobs: Arc<JobManager>,
pub location_manager: Arc<LocationManager>,
pub event_bus_tx: broadcast::Sender<CoreEvent>,
pub p2p: Arc<P2PManager>,
}
pub struct Node {
@ -73,49 +72,26 @@ impl Node {
let jobs = JobManager::new();
let location_manager = LocationManager::new();
let (p2p, mut p2p_rx) = P2PManager::new(config.clone()).await?;
let library_manager = LibraryManager::new(
data_dir.join("libraries"),
NodeContext {
config: config.clone(),
jobs: jobs.clone(),
location_manager: location_manager.clone(),
p2p: p2p.clone(),
// p2p: p2p.clone(),
event_bus_tx: event_bus.0.clone(),
},
)
.await?;
let p2p = P2PManager::new(config.clone(), library_manager.clone()).await?;
#[cfg(debug_assertions)]
if let Some(init_data) = init_data {
init_data.apply(&library_manager).await?;
init_data
.apply(&library_manager, config.get().await)
.await?;
}
tokio::spawn({
let library_manager = library_manager.clone();
async move {
while let Ok((library_id, operations)) = p2p_rx.recv().await {
debug!("going to ingest {} operations", operations.len());
let Some(library) = library_manager.get_library(library_id).await else {
warn!("no library found!");
continue;
};
for op in operations {
library.sync.ingest_op(op).await.unwrap_or_else(|err| {
error!(
"error ingesting operation for library '{}': {err:?}",
library.id
);
});
}
}
}
});
let router = api::mount();
let node = Node {
data_dir: data_dir.to_path_buf(),
@ -239,7 +215,7 @@ pub enum NodeError {
#[error("failed to initialize p2p manager: {0}")]
P2PManager(#[from] sd_p2p::ManagerError),
#[error("invalid platform integer: {0}")]
InvalidPlatformInt(i32),
InvalidPlatformInt(u8),
#[cfg(debug_assertions)]
#[error("Init config error: {0}")]
InitConfig(#[from] util::debug_initializer::InitConfigError),

View file

@ -1,4 +1,9 @@
use std::{path::PathBuf, sync::Arc};
use sd_p2p::{spacetunnel::Identity, PeerId};
use sd_prisma::prisma::node;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use specta::Type;
use uuid::Uuid;
@ -11,27 +16,63 @@ use crate::{
};
/// LibraryConfig holds the configuration for a specific library. This is stored as a '{uuid}.sdlibrary' file.
#[derive(Debug, Serialize, Deserialize, Clone, Type, Default)]
#[derive(Debug, Serialize, Deserialize, Clone)] // If you are adding `specta::Type` on this your probably about to leak the P2P private key
pub struct LibraryConfig {
/// name is the display name of the library. This is used in the UI and is set by the user.
pub name: String,
/// description is a user set description of the library. This is used in the UI and is set by the user.
pub description: String,
pub description: Option<String>,
/// P2P identity of this library.
pub identity: Vec<u8>,
/// Id of the current node
pub node_id: Uuid,
// /// is_encrypted is a flag that is set to true if the library is encrypted.
// #[serde(default)]
// pub is_encrypted: bool,
}
#[derive(Debug, Serialize, Deserialize, Clone, Type)]
pub struct SanitisedLibraryConfig {
pub name: String,
pub description: Option<String>,
pub node_id: Uuid,
}
impl From<LibraryConfig> for SanitisedLibraryConfig {
fn from(config: LibraryConfig) -> Self {
Self {
name: config.name,
description: config.description,
node_id: config.node_id,
}
}
}
impl LibraryConfig {
pub fn new(name: String, node_id: Uuid) -> Self {
Self {
name,
description: None,
identity: Identity::new().to_bytes().to_vec(),
node_id,
}
}
}
#[async_trait::async_trait]
impl Migrate for LibraryConfig {
const CURRENT_VERSION: u32 = 1;
const CURRENT_VERSION: u32 = 4;
type Ctx = PrismaClient;
type Ctx = (Uuid, PeerId, Arc<PrismaClient>);
fn default(path: PathBuf) -> Result<Self, MigratorError> {
Err(MigratorError::ConfigFileMissing(path))
}
async fn migrate(
to_version: u32,
_config: &mut serde_json::Map<String, serde_json::Value>,
db: &Self::Ctx,
config: &mut serde_json::Map<String, serde_json::Value>,
(node_id, peer_id, db): &Self::Ctx,
) -> Result<(), MigratorError> {
match to_version {
0 => {}
@ -59,6 +100,40 @@ impl Migrate for LibraryConfig {
)
.await?;
}
2 => {
config.insert(
"identity".into(),
Value::Array(
Identity::new()
.to_bytes()
.into_iter()
.map(|v| v.into())
.collect(),
),
);
}
// The fact I have to migrate this hurts my soul
3 => {
if db.node().count(vec![]).exec().await? != 1 {
return Err(MigratorError::Custom(
"Ummm, there are too many nodes in the database, this should not happen!"
.into(),
));
}
db.node()
.update_many(
vec![],
vec![
node::pub_id::set(node_id.as_bytes().to_vec()),
node::node_peer_id::set(Some(peer_id.to_string())),
],
)
.exec()
.await?;
config.insert("node_id".into(), Value::String(node_id.to_string()));
}
v => unreachable!("Missing migration for library version {}", v),
}
@ -70,5 +145,5 @@ impl Migrate for LibraryConfig {
#[derive(Serialize, Deserialize, Debug, Type)]
pub struct LibraryConfigWrapped {
pub uuid: Uuid,
pub config: LibraryConfig,
pub config: SanitisedLibraryConfig,
}

View file

@ -20,6 +20,7 @@ use std::{
sync::Arc,
};
use sd_p2p::spacetunnel::Identity;
use tokio::{fs, io};
use tracing::warn;
use uuid::Uuid;
@ -44,6 +45,8 @@ pub struct Library {
pub node_local_id: i32,
/// node_context holds the node context for the node which this library is running on.
pub(super) node_context: NodeContext,
/// p2p identity
pub identity: Arc<Identity>,
pub orphan_remover: OrphanRemoverActor,
}

View file

@ -1,7 +1,7 @@
use crate::{
invalidate_query,
location::{indexer::rules, LocationManagerError},
node::Platform,
node::{NodeConfig, Platform},
object::orphan_remover::OrphanRemoverActor,
prisma::{location, node},
sync::{SyncManager, SyncMessage},
@ -9,24 +9,47 @@ use crate::{
db::{self, MissingFieldError},
error::{FileIOError, NonUtf8PathError},
migrator::{Migrate, MigratorError},
MaybeUndefined,
},
NodeContext,
};
use std::{
env,
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
};
use chrono::Local;
use sd_p2p::spacetunnel::{Identity, IdentityErr};
use thiserror::Error;
use tokio::{fs, io, sync::RwLock, try_join};
use tracing::{debug, error, warn};
use tokio::{
fs, io,
sync::{broadcast, RwLock},
try_join,
};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use super::{Library, LibraryConfig, LibraryConfigWrapped};
pub enum SubscriberEvent {
Load(Uuid, Arc<Identity>, broadcast::Receiver<SyncMessage>),
}
impl Clone for SubscriberEvent {
fn clone(&self) -> Self {
match self {
Self::Load(id, identity, receiver) => {
Self::Load(*id, identity.clone(), receiver.resubscribe())
}
}
}
}
pub trait SubscriberFn: Fn(SubscriberEvent) + Send + Sync + 'static {}
impl<F: Fn(SubscriberEvent) + Send + Sync + 'static> SubscriberFn for F {}
/// LibraryManager is a singleton that manages all libraries for a node.
pub struct LibraryManager {
/// libraries_dir holds the path to the directory where libraries are stored.
@ -35,6 +58,8 @@ pub struct LibraryManager {
libraries: RwLock<Vec<Library>>,
/// node_context holds the context for the node which this library manager is running on.
node_context: NodeContext,
/// on load subscribers
subscribers: RwLock<Vec<Box<dyn SubscriberFn>>>,
}
#[derive(Error, Debug)]
@ -65,6 +90,10 @@ pub enum LibraryManagerError {
NonUtf8Path(#[from] NonUtf8PathError),
#[error("failed to watch locations: {0}")]
LocationWatcher(#[from] LocationManagerError),
#[error("failed to parse library p2p identity: {0}")]
Identity(#[from] IdentityErr),
#[error("current node with id '{0}' was not found in the database")]
CurrentNodeNotFound(String),
#[error("missing-field: {0}")]
MissingField(#[from] MissingFieldError),
}
@ -89,6 +118,7 @@ impl LibraryManager {
.map_err(|e| FileIOError::from((&libraries_dir, e)))?;
let mut libraries = Vec::new();
let subscribers = RwLock::new(Vec::new());
let mut read_dir = fs::read_dir(&libraries_dir)
.await
.map_err(|e| FileIOError::from((&libraries_dir, e)))?;
@ -131,7 +161,14 @@ impl LibraryManager {
}
libraries.push(
Self::load(library_id, &db_path, config_path, node_context.clone()).await?,
Self::load(
library_id,
&db_path,
config_path,
node_context.clone(),
&subscribers,
)
.await?,
);
}
}
@ -140,6 +177,7 @@ impl LibraryManager {
libraries: RwLock::new(libraries),
libraries_dir,
node_context,
subscribers,
});
debug!("LibraryManager initialized");
@ -147,18 +185,33 @@ impl LibraryManager {
Ok(this)
}
/// subscribe to library events
pub(crate) async fn subscribe<F: SubscriberFn>(&self, f: F) {
self.subscribers.write().await.push(Box::new(f));
}
async fn emit(subscribers: &RwLock<Vec<Box<dyn SubscriberFn>>>, event: SubscriberEvent) {
let subscribers = subscribers.read().await;
for subscriber in subscribers.iter() {
subscriber(event.clone());
}
}
/// create creates a new library with the given config and mounts it into the running [LibraryManager].
pub(crate) async fn create(
&self,
config: LibraryConfig,
node_cfg: NodeConfig,
) -> Result<LibraryConfigWrapped, LibraryManagerError> {
self.create_with_uuid(Uuid::new_v4(), config).await
self.create_with_uuid(Uuid::new_v4(), config, node_cfg)
.await
}
pub(crate) async fn create_with_uuid(
&self,
id: Uuid,
config: LibraryConfig,
node_cfg: NodeConfig,
) -> Result<LibraryConfigWrapped, LibraryManagerError> {
if config.name.is_empty() || config.name.chars().all(|x| x.is_whitespace()) {
return Err(LibraryManagerError::InvalidConfig(
@ -180,9 +233,25 @@ impl LibraryManager {
self.libraries_dir.join(format!("{id}.db")),
config_path,
self.node_context.clone(),
&self.subscribers,
)
.await?;
// Create node
node::Create {
pub_id: config.node_id.as_bytes().to_vec(),
name: node_cfg.name.clone(),
platform: Platform::current() as i32,
date_created: Local::now().into(),
_params: vec![
node::identity::set(Some(config.identity.clone())),
node::node_peer_id::set(Some(node_cfg.keypair.peer_id().to_string())),
],
}
.to_query(&library.db)
.exec()
.await?;
debug!("Loaded library '{id:?}'");
// Run seeders
@ -196,7 +265,10 @@ impl LibraryManager {
debug!("Pushed library into manager '{id:?}'");
Ok(LibraryConfigWrapped { uuid: id, config })
Ok(LibraryConfigWrapped {
uuid: id,
config: config.into(),
})
}
pub(crate) async fn get_all_libraries_config(&self) -> Vec<LibraryConfigWrapped> {
@ -205,21 +277,17 @@ impl LibraryManager {
.await
.iter()
.map(|lib| LibraryConfigWrapped {
config: lib.config.clone(),
config: lib.config.clone().into(),
uuid: lib.id,
})
.collect()
}
// pub(crate) async fn get_all_libraries(&self) -> Vec<Library> {
// self.libraries.read().await.clone()
// }
pub(crate) async fn edit(
&self,
id: Uuid,
name: Option<String>,
description: Option<String>,
description: MaybeUndefined<String>,
) -> Result<(), LibraryManagerError> {
// check library is valid
let mut libraries = self.libraries.write().await;
@ -232,8 +300,10 @@ impl LibraryManager {
if let Some(name) = name {
library.config.name = name;
}
if let Some(description) = description {
library.config.description = description;
match description {
MaybeUndefined::Undefined => {}
MaybeUndefined::Null => library.config.description = None,
MaybeUndefined::Value(description) => library.config.description = Some(description),
}
LibraryConfig::save(
@ -313,11 +383,12 @@ impl LibraryManager {
}
/// load the library from a given path
pub(crate) async fn load(
async fn load(
id: Uuid,
db_path: impl AsRef<Path>,
config_path: PathBuf,
node_context: NodeContext,
subscribers: &RwLock<Vec<Box<dyn SubscriberFn>>>,
) -> Result<Library, LibraryManagerError> {
let db_path = db_path.as_ref();
let db_url = format!(
@ -328,50 +399,73 @@ impl LibraryManager {
);
let db = Arc::new(db::load_and_migrate(&db_url).await?);
let config = LibraryConfig::load_and_migrate(&config_path, &db).await?;
let node_config = node_context.config.get().await;
let config = LibraryConfig::load_and_migrate(
&config_path,
&(node_config.id, node_config.keypair.peer_id(), db.clone()),
)
.await?;
let identity = Arc::new(Identity::from_bytes(&config.identity)?);
let platform = match env::consts::OS {
"windows" => Platform::Windows,
"macos" => Platform::MacOS,
"linux" => Platform::Linux,
_ => Platform::Unknown,
};
let uuid_vec = id.as_bytes().to_vec();
let node_data = db
.node()
.upsert(
node::pub_id::equals(uuid_vec.clone()),
node::create(
uuid_vec,
node_config.name.clone(),
vec![node::platform::set(platform as i32)],
),
vec![node::name::set(node_config.name.clone())],
)
.find_unique(node::pub_id::equals(node_config.id.as_bytes().to_vec()))
.exec()
.await?;
.await?
.ok_or_else(|| LibraryManagerError::CurrentNodeNotFound(id.to_string()))?;
let curr_platform = Platform::current() as i32;
if node_data.platform != curr_platform {
info!(
"Detected change of platform for library '{}', was previously '{}' and will change to '{}'. Reconciling node data.",
id,
node_data.platform,
curr_platform
);
db.node()
.update(
node::pub_id::equals(node_data.pub_id.clone()),
vec![
node::platform::set(curr_platform),
node::name::set(node_config.name.clone()),
],
)
.exec()
.await?;
}
if node_data.name != node_config.name {
info!(
"Detected change of node name for library '{}', was previously '{}' and will change to '{}'. Reconciling node data.",
id,
node_data.name,
node_config.name,
);
db.node()
.update(
node::pub_id::equals(node_data.pub_id),
vec![node::name::set(node_config.name.clone())],
)
.exec()
.await?;
}
// TODO: Move this reconciliation into P2P and do reconciliation of both local and remote nodes.
// let key_manager = Arc::new(KeyManager::new(vec![]).await?);
// seed_keymanager(&db, &key_manager).await?;
rules::seeder(&db).await?;
let (sync_manager, mut sync_rx) = SyncManager::new(&db, id);
let (sync_manager, sync_rx) = SyncManager::new(&db, id);
tokio::spawn({
let node_context = node_context.clone();
async move {
while let Ok(op) = sync_rx.recv().await {
let SyncMessage::Created(op) = op else { continue; };
node_context.p2p.broadcast_sync_events(id, vec![op]).await;
}
}
});
Self::emit(
subscribers,
SubscriberEvent::Load(id, identity.clone(), sync_rx),
)
.await;
let library = Library {
id,
@ -383,6 +477,7 @@ impl LibraryManager {
db,
node_local_id: node_data.id,
node_context,
identity,
};
for location in library

View file

@ -59,7 +59,7 @@ impl StatefulJob for IndexerJob {
/// Creates a vector of valid path buffers from a directory, chunked into batches of `BATCH_SIZE`.
async fn init(
&self,
mut ctx: &mut WorkerContext,
ctx: &mut WorkerContext,
state: &mut JobState<Self>,
) -> Result<(), JobError> {
let location_id = state.init.location.id;
@ -109,7 +109,7 @@ impl StatefulJob for IndexerJob {
walk(
&to_walk_path,
&indexer_rules,
update_notifier_fn(BATCH_SIZE, &mut ctx),
update_notifier_fn(BATCH_SIZE, ctx),
file_paths_db_fetcher_fn!(&db),
to_remove_db_fetcher_fn!(location_id, location_path, &db),
iso_file_path_factory(location_id, location_path),
@ -146,7 +146,7 @@ impl StatefulJob for IndexerJob {
);
IndexerJobData::on_scan_progress(
&mut ctx,
ctx,
vec![ScanProgress::Message(format!(
"Starting saving {total_paths} files or directories, \
there still {to_walk_count} directories to index",
@ -176,7 +176,7 @@ impl StatefulJob for IndexerJob {
/// Process each chunk of entries in the indexer job, writing to the `file_path` table
async fn execute_step(
&self,
mut ctx: &mut WorkerContext,
ctx: &mut WorkerContext,
state: &mut JobState<Self>,
) -> Result<(), JobError> {
let data = extract_job_data_mut!(state);
@ -186,7 +186,7 @@ impl StatefulJob for IndexerJob {
let start_time = Instant::now();
IndexerJobData::on_scan_progress(
&mut ctx,
ctx,
vec![
ScanProgress::SavedChunks(step.chunk_idx),
ScanProgress::Message(format!(
@ -221,7 +221,7 @@ impl StatefulJob for IndexerJob {
keep_walking(
to_walk_entry,
&data.indexer_rules,
update_notifier_fn(BATCH_SIZE, &mut ctx),
update_notifier_fn(BATCH_SIZE, ctx),
file_paths_db_fetcher_fn!(&db),
to_remove_db_fetcher_fn!(location_id, location_path, &db),
iso_file_path_factory(location_id, location_path),

View file

@ -15,7 +15,7 @@ use crate::util::migrator::{Migrate, MigratorError};
pub const NODE_STATE_CONFIG_NAME: &str = "node_state.sdconfig";
/// NodeConfig is the configuration for a node. This is shared between all libraries and is stored in a JSON file on disk.
#[derive(Debug, Serialize, Deserialize, Clone, Type)]
#[derive(Debug, Serialize, Deserialize, Clone)] // If you are adding `specta::Type` on this your probably about to leak the P2P private key
pub struct NodeConfig {
/// id is a unique identifier for the current node. Each node has a public identifier (this one) and is given a local id for each library (done within the library code).
pub id: Uuid,
@ -24,7 +24,7 @@ pub struct NodeConfig {
// the port this node uses for peer to peer communication. By default a random free port will be chosen each time the application is started.
pub p2p_port: Option<u32>,
/// The p2p identity keypair for this node. This is used to identify the node on the network.
#[specta(skip)]
/// This keypair does effectively nothing except for provide libp2p with a stable peer_id.
pub keypair: Keypair,
// TODO: These will probs be replaced by your Spacedrive account in the near future.
pub p2p_email: Option<String>,
@ -63,6 +63,24 @@ impl Migrate for NodeConfig {
type Ctx = ();
fn default(_path: PathBuf) -> Result<Self, MigratorError> {
Ok(Self {
id: Uuid::new_v4(),
name: match hostname::get() {
// SAFETY: This is just for display purposes so it doesn't matter if it's lossy
Ok(hostname) => hostname.to_string_lossy().into_owned(),
Err(err) => {
eprintln!("Falling back to default node name as an error occurred getting your systems hostname: '{err}'");
"my-spacedrive".into()
}
},
p2p_port: None,
keypair: Keypair::generate(),
p2p_email: None,
p2p_img_url: None,
})
}
async fn migrate(
from_version: u32,
_config: &mut Map<String, Value>,

View file

@ -1,35 +1,13 @@
use crate::{prisma::node, NodeError};
use crate::NodeError;
use serde::{Deserialize, Serialize};
use specta::Type;
use uuid::Uuid;
mod config;
pub mod peer_request;
pub use config::*;
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
pub struct LibraryNode {
pub uuid: Uuid,
pub name: String,
pub platform: Platform,
}
impl TryFrom<node::Data> for LibraryNode {
type Error = String;
fn try_from(data: node::Data) -> Result<Self, Self::Error> {
Ok(Self {
uuid: Uuid::from_slice(&data.pub_id).map_err(|_| "Invalid node pub_id")?,
name: data.name,
platform: Platform::try_from(data.platform).map_err(|_| "Invalid platform_id")?,
})
}
}
#[allow(clippy::upper_case_acronyms)]
#[repr(i32)]
#[repr(u8)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Type, Eq, PartialEq)]
pub enum Platform {
Unknown = 0,
@ -40,10 +18,32 @@ pub enum Platform {
Android = 5,
}
impl TryFrom<i32> for Platform {
impl Platform {
#[allow(unreachable_code)]
pub fn current() -> Self {
#[cfg(target_os = "windows")]
return Self::Windows;
#[cfg(target_os = "macos")]
return Self::MacOS;
#[cfg(target_os = "linux")]
return Self::Linux;
#[cfg(target_os = "ios")]
return Self::IOS;
#[cfg(target_os = "android")]
return Self::Android;
Self::Unknown
}
}
impl TryFrom<u8> for Platform {
type Error = NodeError;
fn try_from(value: i32) -> Result<Self, Self::Error> {
fn try_from(value: u8) -> Result<Self, Self::Error> {
let s = match value {
0 => Self::Unknown,
1 => Self::Windows,

View file

@ -1,154 +0,0 @@
#![allow(dead_code, unused_variables, clippy::panic, clippy::unwrap_used)] // TODO: Reenable once this is working
use serde::{Deserialize, Serialize};
use specta::Type;
use tokio::sync::{mpsc, oneshot};
pub enum PeerRequest {
Guest(guest::PeerRequest),
Host(host::PeerRequest),
}
enum PlaceholderP2PAction {
SubmitPeeringPassword {
peer_id: String,
password: String,
tx: oneshot::Sender<bool>,
},
}
pub mod guest {
use super::*;
#[derive(Type, Deserialize)]
pub enum Action {
PromptPassword,
ProcessPassword { password: String },
}
#[derive(Type, Serialize, Clone)]
pub enum State {
Start,
AwaitingPassword { prev_invalid: bool },
AwaitingConfirmation,
ChallengeSuccess,
}
pub struct PeerRequest {
pub tx: mpsc::Sender<Action>,
pub peer_id: String,
}
struct ActorArgs {
peer_id: String,
p2p: mpsc::Sender<PlaceholderP2PAction>,
}
async fn loop_until<T, R>(rx: &mut mpsc::Receiver<T>, func: impl Fn(T) -> Option<R>) -> R {
loop {
let Some(msg) = rx.recv().await else {
panic!()
};
if let Some(d) = func(msg) {
break d;
}
}
}
impl PeerRequest {
pub fn new_actor(peer_id: String) -> (Self, mpsc::Receiver<State>) {
let (itx, irx) = mpsc::channel(8);
let (otx, orx) = mpsc::channel(8);
let (p2ptx, _) = mpsc::channel(8);
tokio::spawn(Self::actor(
otx,
irx,
ActorArgs {
peer_id: peer_id.clone(),
p2p: p2ptx,
},
));
(Self { tx: itx, peer_id }, orx)
}
async fn actor(
state_tx: mpsc::Sender<State>,
mut action_rx: mpsc::Receiver<Action>,
ActorArgs { peer_id, p2p }: ActorArgs,
) {
let send_state = |state| async { state_tx.send(state).await.ok() };
send_state(State::Start).await;
loop_until(&mut action_rx, |msg| {
matches!(Action::PromptPassword, msg).then_some(())
})
.await;
send_state(State::AwaitingPassword {
prev_invalid: false,
})
.await;
loop {
let password = loop_until(&mut action_rx, |msg| match msg {
Action::ProcessPassword { password } => Some(password),
_ => None,
})
.await;
let (tx, rx) = oneshot::channel();
p2p.send(PlaceholderP2PAction::SubmitPeeringPassword {
peer_id: peer_id.clone(),
password,
tx,
})
.await
.ok();
if rx.await.unwrap() {
break;
}
send_state(State::AwaitingPassword { prev_invalid: true }).await;
}
send_state(State::ChallengeSuccess).await;
}
pub async fn submit_password(&self, password: String) {
self.tx
.send(Action::ProcessPassword { password })
.await
.ok();
}
}
}
pub mod host {
use super::*;
#[derive(Type, Deserialize)]
pub enum Action {
PromptPassword,
ProcessPassword { password: String },
}
#[derive(Type, Serialize, Clone)]
pub enum State {
AwaitingResponse,
ChallengeReceived,
}
pub struct PeerRequest {
pub tx: mpsc::Sender<Action>,
pub peer_id: String,
}
// impl PeerRequest {
// pub fn new_actor() -> (Self, mpsc::Receiver<State>) {}
// }
}

View file

@ -264,7 +264,7 @@ async fn process_step(
}
Ok(())
}
Err(e) => Err(e.into()),
Err(e) => Err(e),
}
}

View file

@ -1,3 +1,5 @@
#![allow(clippy::unwrap_used, clippy::panic)] // TODO: Remove once this is fully stablised
mod p2p_manager;
mod peer_metadata;
mod protocol;

View file

@ -1,19 +1,24 @@
#![allow(clippy::unwrap_used)] // TODO: Remove once this is fully stablised
use std::{
borrow::Cow,
collections::HashMap,
path::PathBuf,
sync::Arc,
str::FromStr,
sync::{
atomic::{AtomicU16, Ordering},
Arc,
},
time::{Duration, Instant},
};
use chrono::Utc;
use futures::Stream;
use sd_p2p::{
spaceblock::{BlockSize, SpacedropRequest, Transfer},
spaceblock::{BlockSize, SpaceblockRequest, Transfer},
spacetime::SpaceTimeStream,
spacetunnel::{Identity, Tunnel},
Event, Manager, ManagerError, MetadataManager, PeerId,
};
use sd_prisma::prisma::node;
use sd_sync::CRDTOperation;
use serde::Serialize;
use specta::Type;
@ -23,12 +28,14 @@ use tokio::{
sync::{broadcast, oneshot, Mutex},
time::sleep,
};
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use crate::{
node::{NodeConfig, NodeConfigManager},
p2p::{OperatingSystem, SPACEDRIVE_APP_ID},
library::{Library, LibraryManager, SubscriberEvent},
node::{NodeConfig, NodeConfigManager, Platform},
p2p::{NodeInformation, OperatingSystem, SyncRequestError, SPACEDRIVE_APP_ID},
sync::SyncMessage,
};
use super::{Header, PeerMetadata};
@ -58,12 +65,15 @@ pub struct P2PManager {
spacedrop_pairing_reqs: Arc<Mutex<HashMap<Uuid, oneshot::Sender<Option<String>>>>>,
pub metadata_manager: Arc<MetadataManager<PeerMetadata>>,
pub spacedrop_progress: Arc<Mutex<HashMap<Uuid, broadcast::Sender<u8>>>>,
pairing_id: AtomicU16,
library_manager: Arc<LibraryManager>,
}
impl P2PManager {
pub async fn new(
node_config: Arc<NodeConfigManager>,
) -> Result<(Arc<Self>, broadcast::Receiver<(Uuid, Vec<CRDTOperation>)>), ManagerError> {
library_manager: Arc<LibraryManager>,
) -> Result<Arc<Self>, ManagerError> {
let (config, keypair) = {
let config = node_config.get().await;
(Self::config_to_metadata(&config), config.keypair)
@ -82,16 +92,15 @@ impl P2PManager {
// need to keep 'rx' around so that the channel isn't dropped
let (tx, rx) = broadcast::channel(100);
let (tx2, rx2) = broadcast::channel(100);
let spacedrop_pairing_reqs = Arc::new(Mutex::new(HashMap::new()));
let spacedrop_progress = Arc::new(Mutex::new(HashMap::new()));
tokio::spawn({
let events = tx.clone();
// let sync_events = tx2.clone();
let spacedrop_pairing_reqs = spacedrop_pairing_reqs.clone();
let spacedrop_progress = spacedrop_progress.clone();
let library_manager = library_manager.clone();
async move {
let mut shutdown = false;
@ -117,9 +126,9 @@ impl P2PManager {
}
Event::PeerMessage(mut event) => {
let events = events.clone();
let sync_events = tx2.clone();
let spacedrop_pairing_reqs = spacedrop_pairing_reqs.clone();
let spacedrop_progress = spacedrop_progress.clone();
let library_manager = library_manager.clone();
tokio::spawn(async move {
let header = Header::from_stream(&mut event.stream).await.unwrap();
@ -150,11 +159,14 @@ impl P2PManager {
.await
.insert(id, process_tx.clone());
if let Err(_) = events.send(P2PEvent::SpacedropRequest {
id,
peer_id: event.peer_id,
name: req.name.clone(),
}) {
if events
.send(P2PEvent::SpacedropRequest {
id,
peer_id: event.peer_id,
name: req.name.clone(),
})
.is_err()
{
// No frontend's are active
todo!("Outright reject Spacedrop");
@ -163,8 +175,6 @@ impl P2PManager {
tokio::select! {
_ = sleep(SPACEDROP_TIMEOUT) => {
info!("spacedrop({id}): timeout, rejecting!");
return;
}
file_path = rx => {
match file_path {
@ -183,26 +193,123 @@ impl P2PManager {
}
Ok(None) => {
info!("spacedrop({id}): rejected");
return;
}
Err(_) => {
info!("spacedrop({id}): error with Spacedrop pairing request receiver!");
return;
}
}
}
};
}
Header::Sync(library_id, len) => {
Header::Pair(library_id) => {
let mut stream = match event.stream {
SpaceTimeStream::Unicast(stream) => stream,
_ => {
// TODO: Return an error to the remote client
error!("Received Spacedrop request from peer '{}' but it's not a unicast stream!", event.peer_id);
return;
}
};
info!(
"Starting pairing with '{}' for library '{library_id}'",
event.peer_id
);
// TODO: Authentication and security stuff
let library =
library_manager.get_library(library_id).await.unwrap();
debug!("Waiting for nodeinfo from the remote node");
let remote_info = NodeInformation::from_stream(&mut stream)
.await
.unwrap();
debug!(
"Received nodeinfo from the remote node: {:?}",
remote_info
);
debug!("Creating node in database");
node::Create {
pub_id: remote_info.pub_id.as_bytes().to_vec(),
name: remote_info.name,
platform: remote_info.platform as i32,
date_created: Utc::now().into(),
_params: vec![
node::identity::set(Some(
remote_info.public_key.to_bytes().to_vec(),
)),
node::node_peer_id::set(Some(
event.peer_id.to_string(),
)),
],
}
// TODO: Should this be in a transaction in case it fails?
.to_query(&library.db)
.exec()
.await
.unwrap();
let info = NodeInformation {
pub_id: library.config.node_id,
name: library.config.name,
public_key: library.identity.to_remote_identity(),
platform: Platform::current(),
};
debug!("Sending nodeinfo to the remote node");
stream.write_all(&info.to_bytes()).await.unwrap();
info!(
"Paired with '{}' for library '{library_id}'",
remote_info.pub_id
); // TODO: Use hash of identity cert here cause pub_id can be forged
}
Header::Sync(library_id) => {
let stream = match event.stream {
SpaceTimeStream::Unicast(stream) => stream,
_ => {
// TODO: Return an error to the remote client
error!("Received Spacedrop request from peer '{}' but it's not a unicast stream!", event.peer_id);
return;
}
};
let mut stream = Tunnel::from_stream(stream).await.unwrap();
let mut len = [0; 4];
stream
.read_exact(&mut len)
.await
.map_err(SyncRequestError::PayloadLenIoError)
.unwrap();
let len = u32::from_le_bytes(len);
let mut buf = vec![0; len as usize]; // TODO: Designed for easily being able to be DOS the current Node
event.stream.read_exact(&mut buf).await.unwrap();
stream.read_exact(&mut buf).await.unwrap();
let mut buf: &[u8] = &buf;
let operations = rmp_serde::from_read(&mut buf).unwrap();
let operations: Vec<CRDTOperation> =
rmp_serde::from_read(&mut buf).unwrap();
println!("Received sync events for library '{library_id}': {operations:?}");
debug!("ingesting sync events for library '{library_id}': {operations:?}");
sync_events.send((library_id, operations)).unwrap();
let Some(library) = library_manager.get_library(library_id).await else {
warn!("error ingesting sync messages. no library by id '{library_id}' found!");
return;
};
for op in operations {
library.sync.ingest_op(op).await.unwrap_or_else(
|err| {
error!(
"error ingesting operation for library '{}': {err:?}",
library.id
);
},
);
}
}
}
});
@ -233,8 +340,29 @@ impl P2PManager {
spacedrop_pairing_reqs,
metadata_manager,
spacedrop_progress,
pairing_id: AtomicU16::new(0),
library_manager: library_manager.clone(),
});
library_manager
.subscribe({
let this = this.clone();
move |event| match event {
SubscriberEvent::Load(library_id, library_identity, mut sync_rx) => {
let this = this.clone();
tokio::spawn(async move {
while let Ok(op) = sync_rx.recv().await {
let SyncMessage::Created(op) = op else { continue; };
this.broadcast_sync_events(library_id, &library_identity, vec![op])
.await;
}
});
}
}
})
.await;
// TODO: Probs remove this once connection timeout/keepalive are working correctly
tokio::spawn({
let this = this.clone();
@ -246,7 +374,7 @@ impl P2PManager {
}
});
Ok((this, rx2))
Ok(this)
}
fn config_to_metadata(config: &NodeConfig) -> PeerMetadata {
@ -259,6 +387,7 @@ impl P2PManager {
}
}
#[allow(unused)] // TODO: Should probs be using this
pub async fn update_metadata(&self, node_config_manager: &NodeConfigManager) {
self.metadata_manager
.update(Self::config_to_metadata(&node_config_manager.get().await));
@ -280,8 +409,69 @@ impl P2PManager {
self.events.0.subscribe()
}
#[allow(unused)] // TODO: Remove `allow(unused)` once integrated
pub async fn broadcast_sync_events(&self, library_id: Uuid, event: Vec<CRDTOperation>) {
pub fn pair(&self, peer_id: PeerId, lib: Library) -> u16 {
let pairing_id = self.pairing_id.fetch_add(1, Ordering::SeqCst);
let manager = self.manager.clone();
tokio::spawn(async move {
info!(
"Started pairing session '{pairing_id}' with peer '{peer_id}' for library '{}'",
lib.id
);
let mut stream = manager.stream(peer_id).await.unwrap();
let header = Header::Pair(lib.id);
stream.write_all(&header.to_bytes()).await.unwrap();
// TODO: Apply some security here cause this is so open to MITM
// TODO: Signing and a SPAKE style pin prompt
let info = NodeInformation {
pub_id: lib.config.node_id,
name: lib.config.name,
public_key: lib.identity.to_remote_identity(),
platform: Platform::current(),
};
debug!("Sending nodeinfo to remote node");
stream.write_all(&info.to_bytes()).await.unwrap();
debug!("Waiting for nodeinfo from the remote node");
let remote_info = NodeInformation::from_stream(&mut stream).await.unwrap();
debug!("Received nodeinfo from the remote node: {:?}", remote_info);
node::Create {
pub_id: remote_info.pub_id.as_bytes().to_vec(),
name: remote_info.name,
platform: remote_info.platform as i32,
date_created: Utc::now().into(),
_params: vec![
node::identity::set(Some(remote_info.public_key.to_bytes().to_vec())),
node::node_peer_id::set(Some(peer_id.to_string())),
],
}
// TODO: Should this be in a transaction in case it fails?
.to_query(&lib.db)
.exec()
.await
.unwrap();
info!(
"Paired with '{}' for library '{}'",
remote_info.pub_id, lib.id
); // TODO: Use hash of identity cert here cause pub_id can be forged
});
pairing_id
}
pub async fn broadcast_sync_events(
&self,
library_id: Uuid,
_identity: &Identity,
event: Vec<CRDTOperation>,
) {
let mut buf = match rmp_serde::to_vec_named(&event) {
Ok(buf) => buf,
Err(e) => {
@ -289,12 +479,43 @@ impl P2PManager {
return;
}
};
let mut head_buf = Header::Sync(library_id, buf.len() as u32).to_bytes(); // Max Sync payload is like 4GB
let mut head_buf = Header::Sync(library_id).to_bytes(); // Max Sync payload is like 4GB
head_buf.extend_from_slice(&(buf.len() as u32).to_le_bytes());
head_buf.append(&mut buf);
debug!("broadcasting sync events. payload_len={}", buf.len());
// TODO: Determine which clients we share that library with
self.manager.broadcast(head_buf).await;
// TODO: Establish a connection to them
let library = self.library_manager.get_library(library_id).await.unwrap();
// TODO: probs cache this query in memory cause this is gonna be stupid frequent
let target_nodes = library
.db
.node()
.find_many(vec![])
.exec()
.await
.unwrap()
.into_iter()
.map(|n| {
PeerId::from_str(&n.node_peer_id.expect("Node was missing 'node_peer_id'!"))
.unwrap()
})
.collect::<Vec<_>>();
info!(
"Sending sync messages for library '{}' to nodes with peer id's '{:?}'",
library_id, target_nodes
);
// TODO: Do in parallel
for peer_id in target_nodes {
let stream = self.manager.stream(peer_id).await.map_err(|_| ()).unwrap(); // TODO: handle providing incorrect peer id
let mut tunnel = Tunnel::from_stream(stream).await.unwrap();
tunnel.write_all(&head_buf).await.unwrap();
}
}
pub async fn ping(&self) {
@ -314,7 +535,7 @@ impl P2PManager {
let file = File::open(&path).await.map_err(|_| ())?;
let metadata = file.metadata().await.map_err(|_| ())?;
let header = Header::Spacedrop(SpacedropRequest {
let header = Header::Spacedrop(SpaceblockRequest {
name: path
.file_name()
.map(|v| v.to_string_lossy())

View file

@ -1,18 +1,24 @@
use std::string::FromUtf8Error;
use thiserror::Error;
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncRead, AsyncReadExt};
use uuid::Uuid;
use sd_p2p::{
spaceblock::{SpacedropRequest, SpacedropRequestError},
spaceblock::{SpaceblockRequest, SpacedropRequestError},
spacetime::SpaceTimeStream,
spacetunnel::{IdentityErr, RemoteIdentity},
};
use crate::node::Platform;
/// TODO
#[derive(Debug, PartialEq, Eq)]
pub enum Header {
Ping,
Spacedrop(SpacedropRequest),
Sync(Uuid, u32),
Spacedrop(SpaceblockRequest),
Pair(Uuid),
Sync(Uuid),
}
#[derive(Debug, Error)]
@ -49,7 +55,7 @@ impl Header {
match discriminator {
0 => match stream {
SpaceTimeStream::Unicast(stream) => Ok(Self::Spacedrop(
SpacedropRequest::from_stream(stream).await?,
SpaceblockRequest::from_stream(stream).await?,
)),
_ => Err(HeaderError::SpacedropOverMulticastIsForbidden),
},
@ -61,16 +67,19 @@ impl Header {
.await
.map_err(SyncRequestError::LibraryIdIoError)?;
let mut len = [0; 4];
Ok(Self::Pair(
Uuid::from_slice(&uuid).map_err(SyncRequestError::ErrorDecodingLibraryId)?,
))
}
3 => {
let mut uuid = [0u8; 16];
stream
.read_exact(&mut len)
.read_exact(&mut uuid)
.await
.map_err(SyncRequestError::PayloadLenIoError)?;
let len = u32::from_le_bytes(len);
.map_err(SyncRequestError::LibraryIdIoError)?;
Ok(Self::Sync(
Uuid::from_slice(&uuid).map_err(SyncRequestError::ErrorDecodingLibraryId)?,
len,
))
}
d => Err(HeaderError::InvalidDiscriminator(d)),
@ -85,41 +94,176 @@ impl Header {
bytes
}
Self::Ping => vec![1],
Self::Sync(uuid, len) => {
Self::Pair(library_id) => {
let mut bytes = vec![2];
bytes.extend_from_slice(library_id.as_bytes());
bytes
}
Self::Sync(uuid) => {
let mut bytes = vec![3];
bytes.extend_from_slice(uuid.as_bytes());
let len_buf = len.to_le_bytes();
debug_assert_eq!(len_buf.len(), 4); // TODO: Is this bad because `len` is usize??
bytes.extend_from_slice(&len_buf);
bytes
}
}
}
}
// TODO: Unit test it because binary protocols are error prone
// #[cfg(test)]
// mod tests {
// use super::*;
#[derive(Debug, Error)]
pub enum NodeInformationError {
#[error("io error decoding node information library pub_id: {0}")]
ErrorDecodingUuid(std::io::Error),
#[error("error formatting node information library pub_id: {0}")]
UuidFormatError(uuid::Error),
#[error("io error reading node information library name length: {0}")]
NameLenIoError(std::io::Error),
#[error("io error decoding node information library name: {0}")]
ErrorDecodingName(std::io::Error),
#[error("error formatting node information library name: {0}")]
NameFormatError(FromUtf8Error),
#[error("io error reading node information public key length: {0}")]
PublicKeyLenIoError(std::io::Error),
#[error("io error decoding node information public key: {0}")]
ErrorDecodingPublicKey(std::io::Error),
#[error("error decoding public key: {0}")]
ErrorParsingPublicKey(#[from] IdentityErr),
#[error("io error reading node information platform id: {0}")]
PlatformIdError(std::io::Error),
}
// #[test]
// fn test_proto() {
// assert_eq!(
// Header::from_bytes(&Header::Ping.to_bytes()),
// Ok(Header::Ping)
// );
/// is shared between nodes during pairing and contains the information to identify the node.
#[derive(Debug, PartialEq, Eq)]
pub struct NodeInformation {
pub pub_id: Uuid,
pub name: String,
pub public_key: RemoteIdentity,
pub platform: Platform,
}
// assert_eq!(
// Header::from_bytes(&Header::Spacedrop.to_bytes()),
// Ok(Header::Spacedrop)
// );
impl NodeInformation {
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
) -> Result<Self, NodeInformationError> {
let pub_id = {
let mut buf = vec![0u8; 16];
stream
.read_exact(&mut buf)
.await
.map_err(NodeInformationError::ErrorDecodingUuid)?;
// let uuid = Uuid::new_v4();
// assert_eq!(
// Header::from_bytes(&Header::Sync(uuid).to_bytes()),
// Ok(Header::Sync(uuid))
// );
// }
// }
Uuid::from_slice(&buf).map_err(NodeInformationError::UuidFormatError)?
};
let name = {
let len = stream
.read_u16_le()
.await
.map_err(NodeInformationError::NameLenIoError)?;
let mut buf = vec![0u8; len as usize];
stream
.read_exact(&mut buf)
.await
.map_err(NodeInformationError::ErrorDecodingName)?;
String::from_utf8(buf).map_err(NodeInformationError::NameFormatError)?
};
let public_key = {
let len = stream
.read_u16_le()
.await
.map_err(NodeInformationError::PublicKeyLenIoError)?;
let mut buf = vec![0u8; len as usize];
stream
.read_exact(&mut buf)
.await
.map_err(NodeInformationError::ErrorDecodingPublicKey)?;
RemoteIdentity::from_bytes(&buf)?
};
let platform = stream
.read_u8()
.await
.map_err(NodeInformationError::PlatformIdError)?;
Ok(Self {
pub_id,
name,
public_key,
platform: Platform::try_from(platform).unwrap_or(Platform::Unknown),
})
}
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::new();
// Pub id
buf.extend(self.pub_id.as_bytes());
// Name
let len_buf = (self.name.len() as u16).to_le_bytes();
if self.name.len() > u16::MAX as usize {
panic!("Name is too long!"); // TODO: Error handling
}
buf.extend_from_slice(&len_buf);
buf.extend(self.name.as_bytes());
// Public key // TODO: Can I use a fixed size array?
let pk = self.public_key.to_bytes();
let len_buf = (pk.len() as u16).to_le_bytes();
if pk.len() > u16::MAX as usize {
panic!("Public key is too long!"); // TODO: Error handling
}
buf.extend_from_slice(&len_buf);
buf.extend(pk);
// Platform
buf.push(self.platform as u8);
buf
}
}
#[cfg(test)]
mod tests {
use super::*;
use sd_p2p::spacetunnel::Identity;
#[tokio::test]
async fn test_node_information() {
let original = NodeInformation {
pub_id: Uuid::new_v4(),
name: "Name".into(),
public_key: Identity::new().to_remote_identity(),
platform: Platform::current(),
};
let buf = original.to_bytes();
let mut cursor = std::io::Cursor::new(buf);
let info = NodeInformation::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, info);
}
// TODO: Unit test it because binary protocols are error prone
// #[test]
// fn test_proto() {
// assert_eq!(
// Header::from_bytes(&Header::Ping.to_bytes()),
// Ok(Header::Ping)
// );
// assert_eq!(
// Header::from_bytes(&Header::Spacedrop.to_bytes()),
// Ok(Header::Spacedrop)
// );
// let uuid = Uuid::new_v4();
// assert_eq!(
// Header::from_bytes(&Header::Sync(uuid).to_bytes()),
// Ok(Header::Sync(uuid))
// );
// }
}

View file

@ -157,14 +157,15 @@ impl SyncManager {
pub async fn ingest_op(&self, op: CRDTOperation) -> prisma_client_rust::Result<()> {
let db = &self.db;
db.node()
.upsert(
node::pub_id::equals(op.node.as_bytes().to_vec()),
node::create(op.node.as_bytes().to_vec(), "TEMP".to_string(), vec![]),
vec![],
)
if db
.node()
.find_unique(node::pub_id::equals(op.node.as_bytes().to_vec()))
.exec()
.await?;
.await?
.is_none()
{
panic!("Node is not paired!")
}
let msg = SyncMessage::Ingested(op.clone());
@ -294,7 +295,6 @@ impl SyncManager {
.await?;
}
},
_ => todo!(),
}
if let CRDTOperationType::Shared(shared_op) = op.typ {

View file

@ -101,7 +101,7 @@ impl<'a, T> OptionalField for &'a Option<T> {
}
}
pub fn maybe_missing<'a, T: OptionalField>(
pub fn maybe_missing<T: OptionalField>(
data: T,
field: &'static str,
) -> Result<T::Out, MissingFieldError> {

View file

@ -12,10 +12,12 @@ use crate::{
location::{
delete_location, scan_location, LocationCreateArgs, LocationError, LocationManagerError,
},
node::NodeConfig,
prisma::location,
util::AbortOnDrop,
};
use prisma_client_rust::QueryError;
use sd_p2p::spacetunnel::Identity;
use serde::Deserialize;
use thiserror::Error;
use tokio::{
@ -93,7 +95,11 @@ impl InitConfig {
Ok(None)
}
pub async fn apply(self, library_manager: &LibraryManager) -> Result<(), InitConfigError> {
pub async fn apply(
self,
library_manager: &LibraryManager,
node_cfg: NodeConfig,
) -> Result<(), InitConfigError> {
info!("Initializing app from file: {:?}", self.path);
for lib in self.libraries {
@ -108,13 +114,17 @@ impl InitConfig {
let library = match library_manager.get_library(lib.id).await {
Some(lib) => lib,
None => {
let node_pub_id = Uuid::new_v4();
let library = library_manager
.create_with_uuid(
lib.id,
LibraryConfig {
name: lib.name,
description: lib.description.unwrap_or("".to_string()),
description: lib.description,
identity: Identity::new().to_bytes(),
node_id: node_pub_id,
},
node_cfg.clone(),
)
.await?;

View file

@ -0,0 +1,58 @@
//! Copied from: https://docs.rs/async-graphql/latest/async_graphql/types/enum.MaybeUndefined.html
#![allow(unused)]
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use specta::Type;
#[derive(Debug, Clone, Type)]
#[specta(untagged)]
pub enum MaybeUndefined<T> {
Undefined,
Null,
Value(T),
}
impl<T, E> MaybeUndefined<Result<T, E>> {
/// Transposes a `MaybeUndefined` of a [`Result`] into a [`Result`] of a
/// `MaybeUndefined`.
///
/// [`MaybeUndefined::Undefined`] will be mapped to
/// [`Ok`]`(`[`MaybeUndefined::Undefined`]`)`. [`MaybeUndefined::Null`]
/// will be mapped to [`Ok`]`(`[`MaybeUndefined::Null`]`)`.
/// [`MaybeUndefined::Value`]`(`[`Ok`]`(_))` and
/// [`MaybeUndefined::Value`]`(`[`Err`]`(_))` will be mapped to
/// [`Ok`]`(`[`MaybeUndefined::Value`]`(_))` and [`Err`]`(_)`.
#[inline]
pub fn transpose(self) -> Result<MaybeUndefined<T>, E> {
match self {
MaybeUndefined::Undefined => Ok(MaybeUndefined::Undefined),
MaybeUndefined::Null => Ok(MaybeUndefined::Null),
MaybeUndefined::Value(Ok(v)) => Ok(MaybeUndefined::Value(v)),
MaybeUndefined::Value(Err(e)) => Err(e),
}
}
}
impl<T: Serialize> Serialize for MaybeUndefined<T> {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
match self {
MaybeUndefined::Value(value) => value.serialize(serializer),
_ => serializer.serialize_none(),
}
}
}
impl<'de, T> Deserialize<'de> for MaybeUndefined<T>
where
T: Deserialize<'de>,
{
fn deserialize<D>(deserializer: D) -> Result<MaybeUndefined<T>, D::Error>
where
D: Deserializer<'de>,
{
Option::<T>::deserialize(deserializer).map(|value| match value {
Some(value) => MaybeUndefined::Value(value),
None => MaybeUndefined::Null,
})
}
}

View file

@ -2,7 +2,7 @@ use std::{
any::type_name,
fs::File,
io::{self, BufReader, Seek, Write},
path::Path,
path::{Path, PathBuf},
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
@ -23,11 +23,13 @@ pub struct BaseConfig {
/// System for managing app level migrations on a config file so we can introduce breaking changes to the app without the user needing to reset their whole system.
#[async_trait::async_trait]
pub trait Migrate: Sized + DeserializeOwned + Serialize + Default {
pub trait Migrate: Sized + DeserializeOwned + Serialize {
const CURRENT_VERSION: u32;
type Ctx: Sync;
fn default(path: PathBuf) -> Result<Self, MigratorError>;
async fn migrate(
from_version: u32,
config: &mut Map<String, Value>,
@ -90,7 +92,7 @@ pub trait Migrate: Sized + DeserializeOwned + Serialize + Default {
Ok(serde_json::from_value(Value::Object(cfg.other))?)
}
false => Ok(serde_json::from_value(Value::Object(
Self::default().save(path)?.other,
Self::default(path.into())?.save(path)?.other,
))?),
}
}
@ -128,6 +130,8 @@ pub enum MigratorError {
Database(#[from] prisma_client_rust::QueryError),
#[error("We detected a Spacedrive config from a super early version of the app!")]
HasSuperLegacyConfig,
#[error("file '{}' was not found by the migrator!", .0.display())]
ConfigFileMissing(PathBuf),
#[error("custom migration error: {0}")]
Custom(String),
}
@ -154,6 +158,10 @@ mod test {
type Ctx = ();
fn default(_path: PathBuf) -> Result<Self, MigratorError> {
Ok(<Self as Default>::default())
}
async fn migrate(
to_version: u32,
config: &mut Map<String, Value>,

View file

@ -3,7 +3,9 @@ pub mod db;
#[cfg(debug_assertions)]
pub mod debug_initializer;
pub mod error;
mod maybe_undefined;
pub mod migrator;
pub mod version_manager;
pub use abort_on_drop::*;
pub use maybe_undefined::*;

View file

@ -32,6 +32,9 @@ specta = { workspace = true }
flume = "0.10.14"
tokio-util = { version = "0.7.8", features = ["compat"] }
arc-swap = "1.6.0"
p384 = { version = "0.13.0", feature = ["ecdh"] }
ed25519-dalek = { version = "1.0.1", features = ["rand"] }
rand_core = { version = "0.5.1", feature = ["getrandom"] }
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread"] }

View file

@ -8,6 +8,7 @@ mod metadata_manager;
mod peer;
pub mod spaceblock;
pub mod spacetime;
pub mod spacetunnel;
mod utils;
pub use event::*;

View file

@ -1,5 +1,5 @@
use std::{
collections::HashSet,
collections::{HashMap, HashSet},
net::SocketAddr,
sync::{
atomic::{AtomicBool, AtomicU64},
@ -41,7 +41,7 @@ impl<TMetadata: Metadata> Manager<TMetadata> {
.then_some(())
.ok_or(ManagerError::InvalidAppName)?;
let peer_id = PeerId(keypair.peer_id());
let peer_id = PeerId(keypair.raw_peer_id());
let (event_stream_tx, event_stream_rx) = mpsc::channel(1024);
let (mdns, mdns_state) = Mdns::new(application_name, peer_id, metadata_manager)
@ -67,7 +67,7 @@ impl<TMetadata: Metadata> Manager<TMetadata> {
.map(|(p, c), _| (p, StreamMuxerBox::new(c)))
.boxed(),
SpaceTime::new(this.clone()),
keypair.peer_id(),
keypair.raw_peer_id(),
)
.build();
{
@ -92,6 +92,7 @@ impl<TMetadata: Metadata> Manager<TMetadata> {
mdns,
queued_events: Default::default(),
shutdown: AtomicBool::new(false),
on_establish_streams: HashMap::new(),
},
))
}
@ -129,6 +130,7 @@ impl<TMetadata: Metadata> Manager<TMetadata> {
})
}
#[allow(clippy::unused_unit)] // TODO: Remove this clippy override once error handling is added
pub async fn stream(&self, peer_id: PeerId) -> Result<UnicastStream, ()> {
// TODO: With this system you can send to any random peer id. Can I reduce that by requiring `.connect(peer_id).unwrap().send(data)` or something like that.
let (tx, rx) = oneshot::channel();
@ -136,6 +138,8 @@ impl<TMetadata: Metadata> Manager<TMetadata> {
.await;
let mut stream = rx.await.map_err(|_| {
warn!("failed to queue establishing stream to peer '{peer_id}'!");
()
})?;
stream.write_discriminator().await.unwrap(); // TODO: Error handling
Ok(stream)

View file

@ -1,5 +1,5 @@
use std::{
collections::VecDeque,
collections::{HashMap, VecDeque},
fmt,
net::SocketAddr,
sync::{
@ -57,16 +57,14 @@ impl<TMetadata: Metadata> From<Event<TMetadata>> for ManagerStreamAction<TMetada
}
/// TODO
pub struct ManagerStream<TMetadata>
where
TMetadata: Metadata,
{
pub struct ManagerStream<TMetadata: Metadata> {
pub(crate) manager: Arc<Manager<TMetadata>>,
pub(crate) event_stream_rx: mpsc::Receiver<ManagerStreamAction<TMetadata>>,
pub(crate) swarm: Swarm<SpaceTime<TMetadata>>,
pub(crate) mdns: Mdns<TMetadata>,
pub(crate) queued_events: VecDeque<Event<TMetadata>>,
pub(crate) shutdown: AtomicBool,
pub(crate) on_establish_streams: HashMap<libp2p::PeerId, Vec<OutboundRequest>>,
}
impl<TMetadata> ManagerStream<TMetadata>
@ -109,7 +107,20 @@ where
return Some(event);
}
},
SwarmEvent::ConnectionEstablished { .. } => {},
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
if let Some(streams) = self.on_establish_streams.remove(&peer_id) {
for event in streams {
self.swarm
.behaviour_mut()
.pending_events
.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event
});
}
}
},
SwarmEvent::ConnectionClosed { .. } => {},
SwarmEvent::IncomingConnection { local_addr, .. } => debug!("incoming connection from '{}'", local_addr),
SwarmEvent::IncomingConnectionError { local_addr, error, .. } => warn!("handshake error with incoming connection from '{}': {}", local_addr, error),
@ -195,7 +206,7 @@ where
.addresses(addresses.iter().map(socketaddr_to_quic_multiaddr).collect())
.build(),
) {
Ok(_) => {}
Ok(()) => {}
Err(err) => warn!(
"error dialing peer '{}' with addresses '{:?}': {}",
peer_id, addresses, err
@ -203,14 +214,45 @@ where
}
}
ManagerStreamAction::StartStream(peer_id, rx) => {
self.swarm
.behaviour_mut()
.pending_events
.push_back(ToSwarm::NotifyHandler {
peer_id: peer_id.0,
handler: NotifyHandler::Any,
event: OutboundRequest::Unicast(rx),
});
if !self.swarm.connected_peers().any(|v| *v == peer_id.0) {
let addresses = self
.mdns
.state
.discovered
.read()
.await
.get(&peer_id)
.unwrap()
.addresses
.clone();
match self.swarm.dial(
DialOpts::peer_id(peer_id.0)
.condition(PeerCondition::Disconnected)
.addresses(addresses.iter().map(socketaddr_to_quic_multiaddr).collect())
.build(),
) {
Ok(()) => {}
Err(err) => warn!(
"error dialing peer '{}' with addresses '{:?}': {}",
peer_id, addresses, err
),
}
self.on_establish_streams
.entry(peer_id.0)
.or_default()
.push(OutboundRequest::Unicast(rx));
} else {
self.swarm
.behaviour_mut()
.pending_events
.push_back(ToSwarm::NotifyHandler {
peer_id: peer_id.0,
handler: NotifyHandler::Any,
event: OutboundRequest::Unicast(rx),
});
}
}
ManagerStreamAction::BroadcastData(data) => {
let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();

View file

@ -39,7 +39,7 @@ where
service_name: String,
next_mdns_advertisement: Pin<Box<Sleep>>,
trigger_advertisement: mpsc::UnboundedReceiver<()>,
state: Arc<MdnsState<TMetadata>>,
pub(crate) state: Arc<MdnsState<TMetadata>>,
}
impl<TMetadata> Mdns<TMetadata>

View file

@ -43,7 +43,7 @@ impl BlockSize {
/// TODO
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SpacedropRequest {
pub struct SpaceblockRequest {
pub name: String,
pub size: u64,
// TODO: Include file permissions
@ -62,20 +62,24 @@ pub enum SpacedropRequestError {
SizeIoError(std::io::Error),
}
impl SpacedropRequest {
impl SpaceblockRequest {
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
) -> Result<Self, SpacedropRequestError> {
let name_len = stream
.read_u16_le()
.await
.map_err(SpacedropRequestError::NameLenIoError)?;
let mut name = vec![0u8; name_len as usize];
stream
.read_exact(&mut name)
.await
.map_err(SpacedropRequestError::NameIoError)?;
let name = String::from_utf8(name).map_err(SpacedropRequestError::NameFormatError)?;
let name = {
let len = stream
.read_u16_le()
.await
.map_err(SpacedropRequestError::NameLenIoError)?;
let mut buf = vec![0u8; len as usize];
stream
.read_exact(&mut buf)
.await
.map_err(SpacedropRequestError::NameIoError)?;
String::from_utf8(buf).map_err(SpacedropRequestError::NameFormatError)?
};
let size = stream
.read_u64_le()
@ -153,7 +157,7 @@ impl<'a> Block<'a> {
/// TODO
pub struct Transfer<'a, F> {
req: &'a SpacedropRequest,
req: &'a SpaceblockRequest,
on_progress: F,
}
@ -161,7 +165,7 @@ impl<'a, F> Transfer<'a, F>
where
F: Fn(u8) + 'a,
{
pub fn new(req: &'a SpacedropRequest, on_progress: F) -> Self {
pub fn new(req: &'a SpaceblockRequest, on_progress: F) -> Self {
Self { req, on_progress }
}
@ -242,14 +246,14 @@ mod tests {
#[tokio::test]
async fn test_spaceblock_request() {
let req = SpacedropRequest {
let req = SpaceblockRequest {
name: "Demo".to_string(),
size: 42069,
block_size: BlockSize::from_size(42069),
};
let bytes = req.to_bytes();
let req2 = SpacedropRequest::from_stream(&mut Cursor::new(bytes))
let req2 = SpaceblockRequest::from_stream(&mut Cursor::new(bytes))
.await
.unwrap();
assert_eq!(req, req2);
@ -261,7 +265,7 @@ mod tests {
// This is sent out of band of Spaceblock
let data = b"Spacedrive".to_vec();
let req = SpacedropRequest {
let req = SpaceblockRequest {
name: "Demo".to_string(),
size: data.len() as u64,
block_size: BlockSize::from_size(data.len() as u64),
@ -297,7 +301,7 @@ mod tests {
let data = vec![0u8; block_size as usize * 4]; // Let's pacman some RAM
let block_size = BlockSize::dangerously_new(block_size);
let req = SpacedropRequest {
let req = SpaceblockRequest {
name: "Demo".to_string(),
size: data.len() as u64,
block_size,

View file

@ -0,0 +1,54 @@
use ed25519_dalek::PublicKey;
use rand_core::OsRng;
use thiserror::Error;
#[derive(Debug, Error)]
#[error(transparent)]
pub struct IdentityErr(#[from] ed25519_dalek::ed25519::Error);
/// TODO
pub struct Identity(ed25519_dalek::Keypair);
impl Default for Identity {
fn default() -> Self {
Self(ed25519_dalek::Keypair::generate(&mut OsRng))
}
}
impl Identity {
pub fn new() -> Self {
Self::default()
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, IdentityErr> {
Ok(Self(ed25519_dalek::Keypair::from_bytes(bytes)?))
}
pub fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes().to_vec()
}
pub fn public_key(&self) -> PublicKey {
self.0.public
}
pub fn to_remote_identity(&self) -> RemoteIdentity {
RemoteIdentity(self.0.public)
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct RemoteIdentity(ed25519_dalek::PublicKey);
impl RemoteIdentity {
pub fn from_bytes(bytes: &[u8]) -> Result<Self, IdentityErr> {
Ok(Self(ed25519_dalek::PublicKey::from_bytes(bytes)?))
}
pub fn to_bytes(&self) -> [u8; 32] {
self.0.to_bytes()
}
pub fn public_key(&self) -> PublicKey {
self.0
}
}

View file

@ -0,0 +1,7 @@
//! A system for creating encrypted tunnels between peers on untrusted connections.
mod identity;
mod tunnel;
pub use identity::*;
pub use tunnel::*;

View file

@ -0,0 +1,64 @@
use std::{
io,
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
use crate::spacetime::UnicastStream;
pub struct Tunnel {
stream: UnicastStream,
}
impl Tunnel {
// TODO: Proper errors
pub async fn from_stream(mut stream: UnicastStream) -> Result<Self, &'static str> {
let discriminator = stream
.read_u8()
.await
.map_err(|_| "Error reading discriminator. Is this stream actually a tunnel?")?;
if discriminator != b'T' {
return Err("Invalid discriminator. Is this stream actually a tunnel?");
}
// TODO: Do pairing
Ok(Self { stream })
}
}
impl AsyncRead for Tunnel {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// TODO: Do decryption
Pin::new(&mut self.get_mut().stream).poll_read(cx, buf)
}
}
impl AsyncWrite for Tunnel {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
// TODO: Do encryption
Pin::new(&mut self.get_mut().stream).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().stream).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().stream).poll_shutdown(cx)
}
}
// TODO: Unit tests

View file

@ -9,7 +9,14 @@ impl Keypair {
Self(ed25519::Keypair::generate())
}
pub fn peer_id(&self) -> libp2p::PeerId {
pub fn peer_id(&self) -> crate::PeerId {
let pk: libp2p::identity::PublicKey = self.0.public().into();
crate::PeerId(libp2p::PeerId::from_public_key(&pk))
}
// TODO: Maybe try and remove
pub fn raw_peer_id(&self) -> libp2p::PeerId {
let pk: libp2p::identity::PublicKey = self.0.public().into();
libp2p::PeerId::from_public_key(&pk)

View file

@ -9,6 +9,12 @@ pub struct PeerId(
pub(crate) libp2p::PeerId,
);
// impl PeerId {
// pub fn to_string(&self) -> String {
// self.0.to_string()
// }
// }
impl FromStr for PeerId {
#[allow(deprecated)]
type Err = libp2p::core::ParseError;

View file

@ -2,7 +2,13 @@ import { Laptop } from '@sd/assets/icons';
import clsx from 'clsx';
import { useEffect, useState } from 'react';
import { Link, NavLink } from 'react-router-dom';
import { arraysEqual, useBridgeQuery, useLibraryQuery, useOnlineLocations } from '@sd/client';
import {
arraysEqual,
useBridgeQuery,
useFeatureFlag,
useLibraryQuery,
useOnlineLocations
} from '@sd/client';
import { AddLocationButton } from '~/app/$libraryId/settings/library/locations/AddLocationButton';
import { Folder } from '~/components/Folder';
import { SubtleButton } from '~/components/SubtleButton';
@ -26,6 +32,7 @@ export const LibrarySection = () => {
const locations = useLibraryQuery(['locations.list'], { keepPreviousData: true });
const tags = useLibraryQuery(['tags.list'], { keepPreviousData: true });
const onlineLocations = useOnlineLocations();
const isPairingEnabled = useFeatureFlag('p2pPairing');
const [triggeredContextItem, setTriggeredContextItem] = useState<TriggeredContextItem | null>(
null
);
@ -47,9 +54,13 @@ export const LibrarySection = () => {
<Section
name="Nodes"
actionArea={
<Link to="settings/library/nodes">
isPairingEnabled ? (
<Link to="settings/library/nodes">
<SubtleButton />
</Link>
) : (
<SubtleButton />
</Link>
)
}
>
{/* <SidebarLink className="relative w-full group" to={`/`}>

View file

@ -12,6 +12,7 @@ import {
ShieldCheck,
TagSimple
} from 'phosphor-react';
import { useFeatureFlag } from '@sd/client';
import { tw } from '@sd/ui';
import { useOperatingSystem } from '~/hooks/useOperatingSystem';
import Icon from '../Layout/Sidebar/Icon';
@ -23,6 +24,7 @@ const Section = tw.div`space-y-0.5`;
export default () => {
const os = useOperatingSystem();
const isPairingEnabled = useFeatureFlag('p2pPairing');
return (
<div className="custom-scroll no-scrollbar h-full w-60 max-w-[180px] shrink-0 border-r border-app-line/50 pb-5">
@ -68,7 +70,7 @@ export default () => {
<Icon component={GearSix} />
General
</SidebarLink>
<SidebarLink to="library/nodes" disabled>
<SidebarLink to="library/nodes" disabled={!isPairingEnabled}>
<Icon component={ShareNetwork} />
Nodes
</SidebarLink>

View file

@ -1,4 +1,4 @@
import { useBridgeMutation, useLibraryContext } from '@sd/client';
import { MaybeUndefined, useBridgeMutation, useLibraryContext } from '@sd/client';
import { Button, Input, dialogManager } from '@sd/ui';
import { useZodForm, z } from '@sd/ui/src/forms';
import { useDebouncedFormWatch } from '~/hooks';
@ -9,23 +9,31 @@ import DeleteLibraryDialog from '../node/libraries/DeleteDialog';
const schema = z.object({
id: z.string(),
name: z.string().min(1),
description: z.string()
description: z.string().nullable()
});
// TODO: With some extra upstream Specta work this should be able to be removed
function toMaybeUndefined<T>(v: T | null | undefined): MaybeUndefined<T> {
return v as any;
}
export const Component = () => {
const { library } = useLibraryContext();
const editLibrary = useBridgeMutation('library.edit');
const form = useZodForm({
schema,
defaultValues: { id: library!.uuid, ...library?.config }
defaultValues: {
id: library!.uuid,
...library?.config
}
});
useDebouncedFormWatch(form, (value) =>
editLibrary.mutate({
id: library.uuid,
name: value.name ?? null,
description: value.description ?? null
description: toMaybeUndefined(value.description)
})
);

View file

@ -1,12 +1,45 @@
import { useDiscoveredPeers, useFeatureFlag, useLibraryMutation } from '@sd/client';
import { Button } from '@sd/ui';
import { Heading } from '../Layout';
export const Component = () => {
const isPairingEnabled = useFeatureFlag('p2pPairing');
return (
<>
<Heading
title="Nodes"
description="Manage the nodes connected to this library. A node is an instance of Spacedrive's backend, running on a device or server. Each node carries a copy of the database and synchronizes via peer-to-peer connections in realtime."
/>
{/* TODO: Show paired nodes + unpair button */}
{isPairingEnabled && <IncorrectP2PPairingPane />}
</>
);
};
// TODO: This entire component shows a UI which is pairing by node but that is just not how it works.
function IncorrectP2PPairingPane() {
const onlineNodes = useDiscoveredPeers();
const p2pPair = useLibraryMutation('p2p.pair', {
onSuccess(data) {
console.log(data);
}
});
console.log(onlineNodes);
return (
<>
<h1>Pairing</h1>
{[...onlineNodes.entries()].map(([id, node]) => (
<div key={id} className="flex space-x-2">
<p>{node.name}</p>
<Button onClick={() => p2pPair.mutate(id)}>Pair</Button>
</div>
))}
</>
);
}

View file

@ -16,7 +16,7 @@ export type Procedures = {
{ key: "locations.indexer_rules.get", input: LibraryArgs<number>, result: IndexerRule } |
{ key: "locations.indexer_rules.list", input: LibraryArgs<null>, result: IndexerRule[] } |
{ key: "locations.indexer_rules.listForLocation", input: LibraryArgs<number>, result: IndexerRule[] } |
{ key: "locations.list", input: LibraryArgs<null>, result: { id: number; pub_id: number[]; node_id: number | null; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; node: Node | null }[] } |
{ key: "locations.list", input: LibraryArgs<null>, result: { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; node_id: number | null; node: Node | null }[] } |
{ key: "nodeState", input: never, result: NodeState } |
{ key: "search.objects", input: LibraryArgs<ObjectSearchArgs>, result: SearchData<ExplorerItem> } |
{ key: "search.paths", input: LibraryArgs<FilePathSearchArgs>, result: SearchData<ExplorerItem> } |
@ -55,8 +55,9 @@ export type Procedures = {
{ key: "locations.indexer_rules.delete", input: LibraryArgs<number>, result: null } |
{ key: "locations.relink", input: LibraryArgs<string>, result: null } |
{ key: "locations.update", input: LibraryArgs<LocationUpdateArgs>, result: null } |
{ key: "nodes.changeNodeName", input: ChangeNodeNameArgs, result: NodeConfig } |
{ key: "nodes.changeNodeName", input: ChangeNodeNameArgs, result: null } |
{ key: "p2p.acceptSpacedrop", input: [string, string | null], result: null } |
{ key: "p2p.pair", input: LibraryArgs<PeerId>, result: number } |
{ key: "p2p.spacedrop", input: SpacedropArgs, result: string | null } |
{ key: "tags.assign", input: LibraryArgs<TagAssignArgs>, result: null } |
{ key: "tags.create", input: LibraryArgs<TagCreateArgs>, result: Tag } |
@ -90,7 +91,7 @@ export type CreateLibraryArgs = { name: string }
export type DiskType = "SSD" | "HDD" | "Removable"
export type EditLibraryArgs = { id: string; name: string | null; description: string | null }
export type EditLibraryArgs = { id: string; name: string | null; description: MaybeUndefined<string> }
export type ExplorerItem = { type: "Path"; has_local_thumbnail: boolean; thumbnail_key: string[] | null; item: FilePathWithObject } | { type: "Object"; has_local_thumbnail: boolean; thumbnail_key: string[] | null; item: ObjectWithFilePaths }
@ -151,16 +152,11 @@ export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Faile
*/
export type LibraryArgs<T> = { library_id: string; arg: T }
/**
* LibraryConfig holds the configuration for a specific library. This is stored as a '{uuid}.sdlibrary' file.
*/
export type LibraryConfig = { name: string; description: string }
export type LibraryConfigWrapped = { uuid: string; config: LibraryConfig }
export type LibraryConfigWrapped = { uuid: string; config: SanitisedLibraryConfig }
export type LightScanArgs = { location_id: number; sub_path: string }
export type Location = { id: number; pub_id: number[]; node_id: number | null; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null }
export type Location = { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; node_id: number | null }
/**
* `LocationCreateArgs` is the argument received from the client using `rspc` to create a new location.
@ -179,18 +175,15 @@ export type LocationCreateArgs = { path: string; dry_run: boolean; indexer_rules
*/
export type LocationUpdateArgs = { id: number; name: string | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; indexer_rules_ids: number[] }
export type LocationWithIndexerRules = { id: number; pub_id: number[]; node_id: number | null; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; indexer_rules: { indexer_rule: IndexerRule }[] }
export type LocationWithIndexerRules = { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; node_id: number | null; indexer_rules: { indexer_rule: IndexerRule }[] }
export type MaybeNot<T> = T | { not: T }
export type MaybeUndefined<T> = null | null | T
export type MediaData = { id: number; pixel_width: number | null; pixel_height: number | null; longitude: number | null; latitude: number | null; fps: number | null; capture_device_make: string | null; capture_device_model: string | null; capture_device_software: string | null; duration_seconds: number | null; codecs: string | null; streams: number | null }
export type Node = { id: number; pub_id: number[]; name: string; platform: number; date_created: string }
/**
* NodeConfig is the configuration for a node. This is shared between all libraries and is stored in a JSON file on disk.
*/
export type NodeConfig = { id: string; name: string; p2p_port: number | null; p2p_email: string | null; p2p_img_url: string | null }
export type Node = { id: number; pub_id: number[]; name: string; platform: number; date_created: string; identity: number[] | null; node_peer_id: string | null }
export type NodeState = ({ id: string; name: string; p2p_port: number | null; p2p_email: string | null; p2p_img_url: string | null }) & { data_path: string }
@ -239,6 +232,8 @@ export type RenameOne = { from_file_path_id: number; to: string }
export type RuleKind = "AcceptFilesByGlob" | "RejectFilesByGlob" | "AcceptIfChildrenDirectoriesArePresent" | "RejectIfChildrenDirectoriesArePresent"
export type SanitisedLibraryConfig = { name: string; description: string | null; node_id: string }
export type SanitisedNodeConfig = { id: string; name: string; p2p_port: number | null; p2p_email: string | null; p2p_img_url: string | null }
export type SearchData<T> = { cursor: number[] | null; items: T[] }