[ENG-759] P2P Cleanup (#1062)

* less stupid name

* yeet

* awaiting futures is kinda important lol

* no-async

* more proto stuff

* cleanup

* move it round

* None of my homies like broadcast

* minor

* do the shuffle

* restore after force push

* reusing `sysinfo::System` as intended

* fix lol

* remove `node_id` from `Volume`

* Remove `node_local_id` from `Library`

* Remove `Job` to `Node` relation

* feature flags be like

* press save 4head

* remove `Location` -> `Node` relation

* `volume.rs` to `volume/mod.rs`

* yes

* add `Instance` model and deprecate `Node` model

* pairing is better now

* Pretty code

* thinking in code

* wip

* What if Observables but in Rust?!

* Observables aren't it + `useP2PEvents` hook

* more around some jsx

* Trade offer: bad code for working pairing?

* Unit test pairing protocol

* fix everything up

* corrections

* Have you got a moment for our lord and saviour Clippy

* tsc --fixMyCode

* Prisma being wacky

* Config files being f'ed up

* broken config after migrating

* Zed being Zed

* Argh

* cliipzy

* rewind

* Fix truncate logic

* wip: instances in peer metadata

* Rethink instance ids

* fix

* whoops

---------

Co-authored-by: Brendan Allan <brendonovich@outlook.com>
This commit is contained in:
Oscar Beaumont 2023-07-12 08:23:30 +02:00 committed by GitHub
parent c877c03b63
commit cf39f8dbcc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
65 changed files with 1870 additions and 1078 deletions

5
Cargo.lock generated
View file

@ -7268,6 +7268,7 @@ dependencies = [
"tokio-util",
"tracing 0.1.37",
"tracing-subscriber 0.3.17",
"uuid",
]
[[package]]
@ -9324,9 +9325,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "uuid"
version = "1.3.3"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "345444e32442451b267fc254ae85a209c64be56d2890e601a0c37ff0c3c5ecd2"
checksum = "d023da39d1fde5a8a3fe1f3e01ca9632ada0a63e9797de55a879d6e2236277be"
dependencies = [
"getrandom 0.2.9",
"serde",

View file

@ -330,7 +330,8 @@ pub async fn reveal_items(
.db
.location()
.find_many(vec![
location::node_id::equals(Some(library.node_local_id)),
// TODO(N): This will fall apart with removable media and is making an invalid assumption that the `Node` is fixed for an `Instance`.
location::instance_id::equals(Some(library.config.instance_id)),
location::id::in_vec(locations),
])
.select(location::select!({ path }))

View file

@ -4,7 +4,6 @@ import { Animated, FlatList, Pressable, Text, View } from 'react-native';
import { Swipeable } from 'react-native-gesture-handler';
import {
Location,
Node,
arraysEqual,
useLibraryMutation,
useLibraryQuery,
@ -19,7 +18,7 @@ import { tw, twStyle } from '~/lib/tailwind';
import { SettingsStackScreenProps } from '~/navigation/SettingsNavigator';
type LocationItemProps = {
location: Location & { node: Node | null };
location: Location;
index: number;
navigation: SettingsStackScreenProps<'LocationSettings'>['navigation'];
};
@ -109,13 +108,14 @@ function LocationItem({ location, index, navigation }: LocationItemProps) {
<Text numberOfLines={1} style={tw`text-sm font-semibold text-ink`}>
{location.name}
</Text>
{location.node && (
{/* // TODO: This is ephemeral so it should not come from the DB. Eg. a external USB can move between nodes */}
{/* {location.node && (
<View style={tw`mt-0.5 self-start rounded bg-app-highlight px-1 py-[1px]`}>
<Text numberOfLines={1} style={tw`text-xs font-semibold text-ink-dull`}>
{location.node.name}
</Text>
</View>
)}
)} */}
<Text
numberOfLines={1}
style={tw`mt-0.5 text-[10px] font-semibold text-ink-dull`}

View file

@ -0,0 +1,95 @@
/*
Warnings:
- You are about to drop the column `node_id` on the `job` table. All the data in the column will be lost.
- You are about to drop the column `node_id` on the `shared_operation` table. All the data in the column will be lost.
- You are about to drop the column `node_id` on the `location` table. All the data in the column will be lost.
- You are about to drop the column `node_id` on the `volume` table. All the data in the column will be lost.
- Added the required column `instance_id` to the `shared_operation` table without a default value. This is not possible if the table is not empty.
*/
-- CreateTable
CREATE TABLE "instance" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"pub_id" BLOB NOT NULL,
"identity" BLOB NOT NULL,
"node_id" BLOB NOT NULL,
"node_name" TEXT NOT NULL,
"node_platform" INTEGER NOT NULL,
"last_seen" DATETIME NOT NULL,
"date_created" DATETIME NOT NULL
);
-- RedefineTables
PRAGMA foreign_keys=OFF;
CREATE TABLE "new_job" (
"id" BLOB NOT NULL PRIMARY KEY,
"name" TEXT,
"action" TEXT,
"status" INTEGER,
"errors_text" TEXT,
"data" BLOB,
"metadata" BLOB,
"parent_id" BLOB,
"task_count" INTEGER,
"completed_task_count" INTEGER,
"date_estimated_completion" DATETIME,
"date_created" DATETIME,
"date_started" DATETIME,
"date_completed" DATETIME,
CONSTRAINT "job_parent_id_fkey" FOREIGN KEY ("parent_id") REFERENCES "job" ("id") ON DELETE CASCADE ON UPDATE CASCADE
);
INSERT INTO "new_job" ("action", "completed_task_count", "data", "date_completed", "date_created", "date_estimated_completion", "date_started", "errors_text", "id", "metadata", "name", "parent_id", "status", "task_count") SELECT "action", "completed_task_count", "data", "date_completed", "date_created", "date_estimated_completion", "date_started", "errors_text", "id", "metadata", "name", "parent_id", "status", "task_count" FROM "job";
DROP TABLE "job";
ALTER TABLE "new_job" RENAME TO "job";
CREATE TABLE "new_shared_operation" (
"id" BLOB NOT NULL PRIMARY KEY,
"timestamp" BIGINT NOT NULL,
"model" TEXT NOT NULL,
"record_id" BLOB NOT NULL,
"kind" TEXT NOT NULL,
"data" BLOB NOT NULL,
"instance_id" INTEGER NOT NULL,
CONSTRAINT "shared_operation_instance_id_fkey" FOREIGN KEY ("instance_id") REFERENCES "instance" ("id") ON DELETE RESTRICT ON UPDATE CASCADE
);
-- INSERT INTO "new_shared_operation" ("data", "id", "kind", "model", "record_id", "timestamp") SELECT "data", "id", "kind", "model", "record_id", "timestamp" FROM "shared_operation";
DROP TABLE "shared_operation";
ALTER TABLE "new_shared_operation" RENAME TO "shared_operation";
CREATE TABLE "new_location" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"pub_id" BLOB NOT NULL,
"name" TEXT,
"path" TEXT,
"total_capacity" INTEGER,
"available_capacity" INTEGER,
"is_archived" BOOLEAN,
"generate_preview_media" BOOLEAN,
"sync_preview_media" BOOLEAN,
"hidden" BOOLEAN,
"date_created" DATETIME,
"instance_id" INTEGER
);
INSERT INTO "new_location" ("available_capacity", "date_created", "generate_preview_media", "hidden", "id", "is_archived", "name", "path", "pub_id", "sync_preview_media", "total_capacity") SELECT "available_capacity", "date_created", "generate_preview_media", "hidden", "id", "is_archived", "name", "path", "pub_id", "sync_preview_media", "total_capacity" FROM "location";
DROP TABLE "location";
ALTER TABLE "new_location" RENAME TO "location";
CREATE UNIQUE INDEX "location_pub_id_key" ON "location"("pub_id");
CREATE TABLE "new_volume" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"name" TEXT NOT NULL,
"mount_point" TEXT NOT NULL,
"total_bytes_capacity" TEXT NOT NULL DEFAULT '0',
"total_bytes_available" TEXT NOT NULL DEFAULT '0',
"disk_type" TEXT,
"filesystem" TEXT,
"is_system" BOOLEAN NOT NULL DEFAULT false,
"date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO "new_volume" ("date_modified", "disk_type", "filesystem", "id", "is_system", "mount_point", "name", "total_bytes_available", "total_bytes_capacity") SELECT "date_modified", "disk_type", "filesystem", "id", "is_system", "mount_point", "name", "total_bytes_available", "total_bytes_capacity" FROM "volume";
DROP TABLE "volume";
ALTER TABLE "new_volume" RENAME TO "volume";
CREATE UNIQUE INDEX "volume_mount_point_name_key" ON "volume"("mount_point", "name");
PRAGMA foreign_key_check;
PRAGMA foreign_keys=ON;
-- CreateIndex
CREATE UNIQUE INDEX "instance_pub_id_key" ON "instance"("pub_id");

View file

@ -26,12 +26,52 @@ model SharedOperation {
kind String
data Bytes
node_id Int
node Node @relation(fields: [node_id], references: [id])
instance_id Int
instance Instance @relation(fields: [instance_id], references: [id])
// attestation Bytes
@@map("shared_operation")
}
/// @deprecated: This model has to exist solely for backwards compatibility.
model Node {
id Int @id @default(autoincrement())
pub_id Bytes @unique
name String
// Enum: sd_core::node::Platform
platform Int
date_created DateTime
identity Bytes? // TODO: Change to required field in future
node_peer_id String? // TODO: Remove as part of - https://linear.app/spacedriveapp/issue/ENG-757/p2p-library-portability
@@map("node")
}
/// @local
// represents a single `.db` file (SQLite DB) that is paired to the current library.
// A `LibraryInstance` is always owned by a single `Node` but it's possible for that node to change (or two to be owned by a single node).
model Instance {
id Int @id @default(autoincrement()) // This is is NOT globally unique
pub_id Bytes @unique // This UUID is meaningless and exists soley cause the `uhlc::ID` must be 16-bit. Really this should be derived from the `identity` field.
// Enum: sd_p2p::identity::Identity
identity Bytes
node_id Bytes
node_name String
// Enum: sd_core::node::Platform
node_platform Int
last_seen DateTime // Time core started for owner, last P2P message for P2P node
date_created DateTime
// attestation Bytes
SharedOperation SharedOperation[]
@@map("instance")
}
model Statistics {
id Int @id @default(autoincrement())
date_captured DateTime @default(now())
@ -46,29 +86,9 @@ model Statistics {
@@map("statistics")
}
/// @local(id: pub_id)
model Node {
id Int @id @default(autoincrement())
pub_id Bytes @unique
name String
// Enum: sd_core::node::Platform
platform Int
date_created DateTime
identity Bytes? // TODO: Change to required field in future
node_peer_id String? // TODO: Remove as part of - https://linear.app/spacedriveapp/issue/ENG-757/p2p-library-portability
jobs Job[]
Location Location[]
SharedOperation SharedOperation[]
@@map("node")
}
/// @local
model Volume {
id Int @id @default(autoincrement())
node_id Int
name String
mount_point String
total_bytes_capacity String @default("0")
@ -78,7 +98,7 @@ model Volume {
is_system Boolean @default(false)
date_modified DateTime @default(now())
@@unique([node_id, mount_point, name])
@@unique([mount_point, name])
@@map("volume")
}
@ -97,8 +117,8 @@ model Location {
hidden Boolean?
date_created DateTime?
node_id Int?
node Node? @relation(fields: [node_id], references: [id])
instance_id Int?
// instance Instance? @relation(fields: [instance_id], references: [id]) // TODO: Enabling this breaks migration 7's `update_many` in `library/config.rs`
file_paths FilePath[]
indexer_rules IndexerRulesInLocation[]
@ -357,9 +377,8 @@ model ObjectInSpace {
model Job {
id Bytes @id
name String?
node_id Int?
action String? // Will be composed of "{action_description}(-{children_order})*"
name String?
action String? // Will be composed of "{action_description}(-{children_order})*"
// Enum: sd_core::job::job_manager:JobStatus
status Int? // 0 = Queued
@ -380,8 +399,6 @@ model Job {
date_started DateTime? // Started execution
date_completed DateTime? // Finished execution
node Node? @relation(fields: [node_id], references: [id], onDelete: Cascade, onUpdate: Cascade)
parent Job? @relation("jobs_dependency", fields: [parent_id], references: [id], onDelete: Cascade, onUpdate: Cascade)
children Job[] @relation("jobs_dependency")
@ -461,8 +478,8 @@ model IndexerRulesInLocation {
/// @shared(id: key)
model Preference {
key String @id
value Bytes?
key String @id
value Bytes?
@@map("preference")
}

View file

@ -1,12 +1,8 @@
use crate::{
library::{LibraryConfig, LibraryName},
prisma::statistics,
util::MaybeUndefined,
volume::get_volumes,
};
use crate::{library::LibraryName, util::MaybeUndefined, volume::get_volumes};
use chrono::Utc;
use rspc::alpha::AlphaRouter;
use sd_prisma::prisma::statistics;
use serde::Deserialize;
use specta::Type;
use tracing::debug;
@ -95,10 +91,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
let new_library = ctx
.library_manager
.create(
LibraryConfig::new(args.name, ctx.config.get().await.id),
ctx.config.get().await,
)
.create(args.name, None, ctx.config.get().await)
.await?;
Ok(new_library)

View file

@ -52,7 +52,6 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.location()
.find_many(vec![])
.order_by(location::date_created::order(SortOrder::Desc))
.include(location::include!({ node }))
.exec()
.await?)
})

View file

@ -1,9 +1,11 @@
use crate::prisma::{location, node};
use crate::prisma::location;
use rspc::{alpha::AlphaRouter, ErrorCode};
use sd_prisma::prisma::instance;
use serde::Deserialize;
use specta::Type;
use tracing::error;
use uuid::Uuid;
use super::{locations::ExplorerItem, utils::library, Ctx, R};
@ -45,38 +47,36 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
// TODO: add pagination!! and maybe ordering etc
.procedure("listLocations", {
R.with2(library())
.query(|(ctx, library), _node_id: Option<String>| async move {
// 1. grab currently active node
let node_config = ctx.config.get().await;
let node_pub_id = node_config.id.as_bytes().to_vec();
// 2. get node from database
let node = library
// TODO: I don't like this. `node_id` should probs be a machine hash or something cause `node_id` is dynamic in the context of P2P and what does it mean for removable media to be owned by a node?
.query(|(_, library), node_id: Option<Uuid>| async move {
// Be aware multiple instances can exist on a single node. This is generally an edge case but it's possible.
let instances = library
.db
.node()
.find_unique(node::pub_id::equals(node_pub_id))
.instance()
.find_many(vec![node_id
.map(|id| instance::node_id::equals(id.as_bytes().to_vec()))
.unwrap_or(instance::id::equals(library.config.instance_id))])
.exec()
.await?;
if let Some(node) = node {
// query for locations with that node id
let locations: Vec<ExplorerItem> = library
.db
.location()
.find_many(vec![location::node_id::equals(Some(node.id))])
.exec()
.await?
.into_iter()
.map(|location| ExplorerItem::Location {
has_local_thumbnail: false,
thumbnail_key: None,
item: location,
})
.collect();
return Ok(locations);
}
Ok(vec![])
Ok(library
.db
.location()
.find_many(
instances
.into_iter()
.map(|i| location::instance_id::equals(Some(i.id)))
.collect(),
)
.exec()
.await?
.into_iter()
.map(|location| ExplorerItem::Location {
has_local_thumbnail: false,
thumbnail_key: None,
item: location,
})
.collect::<Vec<_>>())
})
})
}

View file

@ -5,9 +5,9 @@ use specta::Type;
use std::path::PathBuf;
use uuid::Uuid;
use crate::p2p::P2PEvent;
use crate::p2p::{P2PEvent, PairingDecision};
use super::{utils::library, Ctx, R};
use super::{Ctx, R};
pub(crate) fn mount() -> AlphaRouter<Ctx> {
R.router()
@ -67,6 +67,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
}
})
})
// TODO: Send this over `p2p.events`
.procedure("spacedropProgress", {
R.subscription(|ctx, id: Uuid| async move {
ctx.p2p.spacedrop_progress(id).await.ok_or_else(|| {
@ -75,7 +76,17 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
})
})
.procedure("pair", {
R.with2(library())
.mutation(|(ctx, lib), id: PeerId| async move { ctx.p2p.pair(id, lib) })
R.mutation(|ctx, id: PeerId| async move {
ctx.p2p
.pairing
.clone()
.originator(id, ctx.config.get().await)
.await
})
})
.procedure("pairingResponse", {
R.mutation(|ctx, (pairing_id, decision): (u16, PairingDecision)| {
ctx.p2p.pairing.decision(pairing_id, decision);
})
})
}

View file

@ -1,6 +1,6 @@
use crate::{
library::Library,
prisma::{job, node},
prisma::job,
util::db::{chain_optional_iter, maybe_missing, MissingFieldError},
};
@ -205,7 +205,6 @@ impl JobReport {
self.id.as_bytes().to_vec(),
chain_optional_iter(
[
job::node::connect(node::id::equals(library.node_local_id)),
job::name::set(Some(self.name.clone())),
job::action::set(self.action.clone()),
job::data::set(self.data.clone()),

View file

@ -200,21 +200,6 @@ impl Node {
self.p2p.shutdown().await;
info!("Spacedrive Core shutdown successful!");
}
// pub async fn begin_guest_peer_request(
// &self,
// peer_id: String,
// ) -> Option<Receiver<peer_request::guest::State>> {
// let mut pr_guard = self.peer_request.lock().await;
// if pr_guard.is_some() {
// return None;
// }
// let (req, stream) = peer_request::guest::PeerRequest::new_actor(peer_id);
// *pr_guard = Some(PeerRequest::Guest(req));
// Some(stream)
// }
}
/// Error type for Node related errors.

View file

@ -1,4 +1,5 @@
use crate::{
node::{NodeConfig, Platform},
prisma::{file_path, indexer_rule, PrismaClient},
util::{
db::{maybe_missing, uuid_to_bytes},
@ -6,8 +7,9 @@ use crate::{
},
};
use sd_p2p::{spacetunnel::Identity, PeerId};
use sd_prisma::prisma::node;
use chrono::Utc;
use sd_p2p::spacetunnel::Identity;
use sd_prisma::prisma::{instance, location, node};
use std::{path::PathBuf, sync::Arc};
@ -21,54 +23,21 @@ use uuid::Uuid;
use super::name::LibraryName;
/// LibraryConfig holds the configuration for a specific library. This is stored as a '{uuid}.sdlibrary' file.
#[derive(Debug, Serialize, Deserialize, Clone)] // If you are adding `specta::Type` on this your probably about to leak the P2P private key
#[derive(Debug, Serialize, Deserialize, Clone, Type)]
pub struct LibraryConfig {
/// name is the display name of the library. This is used in the UI and is set by the user.
pub name: LibraryName,
/// description is a user set description of the library. This is used in the UI and is set by the user.
pub description: Option<String>,
/// P2P identity of this library.
pub identity: Vec<u8>,
/// Id of the current node
pub node_id: Uuid,
// /// is_encrypted is a flag that is set to true if the library is encrypted.
// #[serde(default)]
// pub is_encrypted: bool,
}
#[derive(Debug, Serialize, Deserialize, Clone, Type)]
pub struct SanitisedLibraryConfig {
pub name: LibraryName,
pub description: Option<String>,
pub node_id: Uuid,
}
impl From<LibraryConfig> for SanitisedLibraryConfig {
fn from(config: LibraryConfig) -> Self {
Self {
name: config.name,
description: config.description,
node_id: config.node_id,
}
}
}
impl LibraryConfig {
pub fn new(name: LibraryName, node_id: Uuid) -> Self {
Self {
name,
description: None,
identity: Identity::new().to_bytes().to_vec(),
node_id,
}
}
/// id of the current instance so we know who this `.db` is. This can be looked up within the `Instance` table.
pub instance_id: i32,
}
#[async_trait::async_trait]
impl Migrate for LibraryConfig {
const CURRENT_VERSION: u32 = 5;
const CURRENT_VERSION: u32 = 7;
type Ctx = (Uuid, PeerId, Arc<PrismaClient>);
type Ctx = (NodeConfig, Arc<PrismaClient>);
fn default(path: PathBuf) -> Result<Self, MigratorError> {
Err(MigratorError::ConfigFileMissing(path))
@ -77,7 +46,7 @@ impl Migrate for LibraryConfig {
async fn migrate(
to_version: u32,
config: &mut serde_json::Map<String, serde_json::Value>,
(node_id, peer_id, db): &Self::Ctx,
(node_config, db): &Self::Ctx,
) -> Result<(), MigratorError> {
match to_version {
0 => {}
@ -130,14 +99,16 @@ impl Migrate for LibraryConfig {
.update_many(
vec![],
vec![
node::pub_id::set(node_id.as_bytes().to_vec()),
node::node_peer_id::set(Some(peer_id.to_string())),
node::pub_id::set(node_config.id.as_bytes().to_vec()),
node::node_peer_id::set(Some(
node_config.keypair.peer_id().to_string(),
)),
],
)
.exec()
.await?;
config.insert("node_id".into(), Value::String(node_id.to_string()));
config.insert("node_id".into(), Value::String(node_config.id.to_string()));
}
4 => {} // -_-
5 => loop {
@ -189,6 +160,63 @@ impl Migrate for LibraryConfig {
)
.await?;
},
6 => {
let nodes = db.node().find_many(vec![]).exec().await?;
if nodes.is_empty() {
println!("6 - No nodes found... How did you even get this far? but this is fine we can fix it.");
} else if nodes.len() > 1 {
return Err(MigratorError::Custom(
"6 - More than one node found in the DB... This can't be automatically reconciled!"
.into(),
));
}
let node = nodes.first();
let now = Utc::now().fixed_offset();
let instance_id = Uuid::new_v4();
instance::Create {
pub_id: instance_id.as_bytes().to_vec(),
identity: node
.and_then(|n| n.identity.clone())
.unwrap_or_else(|| Identity::new().to_bytes()),
node_id: node_config.id.as_bytes().to_vec(),
node_name: node_config.name.clone(),
node_platform: Platform::current() as i32,
last_seen: now,
date_created: node.map(|n| n.date_created).unwrap_or_else(|| now),
_params: vec![],
}
.to_query(db)
.exec()
.await?;
config.remove("node_id");
config.remove("identity");
config.insert("instance_id".into(), Value::String(instance_id.to_string()));
}
7 => {
let instances = db.instance().find_many(vec![]).exec().await?;
if instances.len() > 1 {
return Err(MigratorError::Custom(
"7 - More than one node found in the DB... This can't be automatically reconciled!"
.into(),
));
}
let Some(instance) = instances.first() else {
return Err(MigratorError::Custom(
"7 - No nodes found... How did you even get this far?!".into(),
));
};
// We are relinking all locations to the current instance.
// If you have more than one node in your database and your not @Oscar, something went horribly wrong so this is fine.
db.location()
.update_many(vec![], vec![location::instance_id::set(Some(instance.id))])
.exec()
.await?;
}
v => unreachable!("Missing migration for library version {}", v),
}
@ -200,5 +228,5 @@ impl Migrate for LibraryConfig {
#[derive(Serialize, Deserialize, Debug, Type)]
pub struct LibraryConfigWrapped {
pub uuid: Uuid,
pub config: SanitisedLibraryConfig,
pub config: LibraryConfig,
}

View file

@ -31,8 +31,6 @@ use super::{LibraryConfig, LibraryManagerError};
pub struct Library {
/// id holds the ID of the current library.
pub id: Uuid,
/// local_id holds the local ID of the current library.
pub local_id: i32,
/// config holds the configuration of the current library.
pub config: LibraryConfig,
/// db holds the database client for the current library.
@ -40,8 +38,6 @@ pub struct Library {
pub sync: Arc<SyncManager>,
/// key manager that provides encryption keys to functions that require them
// pub key_manager: Arc<KeyManager>,
/// node_local_id holds the local ID of the node which is running the library.
pub node_local_id: i32,
/// node_context holds the node context for the node which this library is running on.
pub node_context: NodeContext,
/// p2p identity
@ -57,7 +53,6 @@ impl Debug for Library {
.field("id", &self.id)
.field("config", &self.config)
.field("db", &self.db)
.field("node_local_id", &self.node_local_id)
.finish()
}
}
@ -102,8 +97,9 @@ impl Library {
self.db
.file_path()
.find_many(vec![
file_path::location::is(vec![location::node_id::equals(Some(
self.node_local_id,
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
file_path::location::is(vec![location::instance_id::equals(Some(
self.config.instance_id,
))]),
file_path::id::in_vec(ids),
])

View file

@ -3,7 +3,7 @@ use crate::{
location::{indexer, LocationManagerError},
node::{NodeConfig, Platform},
object::{orphan_remover::OrphanRemoverActor, tag},
prisma::{location, node},
prisma::location,
sync::{SyncManager, SyncMessage},
util::{
db::{self, MissingFieldError},
@ -20,8 +20,9 @@ use std::{
sync::Arc,
};
use chrono::Local;
use chrono::Utc;
use sd_p2p::spacetunnel::{Identity, IdentityErr};
use sd_prisma::prisma::instance;
use thiserror::Error;
use tokio::{
fs, io,
@ -92,8 +93,8 @@ pub enum LibraryManagerError {
LocationWatcher(#[from] LocationManagerError),
#[error("failed to parse library p2p identity: {0}")]
Identity(#[from] IdentityErr),
#[error("current node with id '{0}' was not found in the database")]
CurrentNodeNotFound(String),
#[error("current instance with id '{0}' was not found in the database")]
CurrentInstanceNotFound(String),
#[error("missing-field: {0}")]
MissingField(#[from] MissingFieldError),
}
@ -197,25 +198,33 @@ impl LibraryManager {
/// create creates a new library with the given config and mounts it into the running [LibraryManager].
pub(crate) async fn create(
&self,
config: LibraryConfig,
name: LibraryName,
description: Option<String>,
node_cfg: NodeConfig,
) -> Result<LibraryConfigWrapped, LibraryManagerError> {
self.create_with_uuid(Uuid::new_v4(), config, node_cfg)
self.create_with_uuid(Uuid::new_v4(), name, description, node_cfg)
.await
}
pub(crate) async fn create_with_uuid(
&self,
id: Uuid,
config: LibraryConfig,
name: LibraryName,
description: Option<String>,
node_cfg: NodeConfig,
) -> Result<LibraryConfigWrapped, LibraryManagerError> {
if config.name.is_empty() || config.name.chars().all(|x| x.is_whitespace()) {
if name.as_ref().is_empty() || name.as_ref().chars().all(|x| x.is_whitespace()) {
return Err(LibraryManagerError::InvalidConfig(
"name cannot be empty".to_string(),
));
}
let config = LibraryConfig {
name,
description,
instance_id: 0, // First instance will always be zero
};
let config_path = self.libraries_dir.join(format!("{id}.sdlibrary"));
config.save(&config_path)?;
@ -225,21 +234,22 @@ impl LibraryManager {
config_path.display()
);
let now = Utc::now().fixed_offset();
let library = Self::load(
id,
self.libraries_dir.join(format!("{id}.db")),
config_path,
self.node_context.clone(),
&self.subscribers,
Some(node::Create {
pub_id: config.node_id.as_bytes().to_vec(),
name: node_cfg.name.clone(),
platform: Platform::current() as i32,
date_created: Local::now().into(),
_params: vec![
node::identity::set(Some(config.identity.clone())),
node::node_peer_id::set(Some(node_cfg.keypair.peer_id().to_string())),
],
Some(instance::Create {
pub_id: Uuid::new_v4().as_bytes().to_vec(),
identity: Identity::new().to_bytes(),
node_id: node_cfg.id.as_bytes().to_vec(),
node_name: node_cfg.name.clone(),
node_platform: Platform::current() as i32,
last_seen: now,
date_created: now,
_params: vec![instance::id::set(config.instance_id)],
}),
)
.await?;
@ -258,10 +268,7 @@ impl LibraryManager {
debug!("Pushed library into manager '{id:?}'");
Ok(LibraryConfigWrapped {
uuid: id,
config: config.into(),
})
Ok(LibraryConfigWrapped { uuid: id, config })
}
pub(crate) async fn get_all_libraries_config(&self) -> Vec<LibraryConfigWrapped> {
@ -270,12 +277,16 @@ impl LibraryManager {
.await
.iter()
.map(|lib| LibraryConfigWrapped {
config: lib.config.clone().into(),
config: lib.config.clone(),
uuid: lib.id,
})
.collect()
}
pub(crate) async fn get_all_instances(&self) -> Vec<instance::Data> {
vec![] // TODO: Cache in memory
}
pub(crate) async fn edit(
&self,
id: Uuid,
@ -382,7 +393,7 @@ impl LibraryManager {
config_path: PathBuf,
node_context: NodeContext,
subscribers: &RwLock<Vec<Box<dyn SubscriberFn>>>,
create: Option<node::Create>,
create: Option<instance::Create>,
) -> Result<Library, LibraryManagerError> {
let db_path = db_path.as_ref();
let db_url = format!(
@ -398,66 +409,51 @@ impl LibraryManager {
}
let node_config = node_context.config.get().await;
let config = LibraryConfig::load_and_migrate(
&config_path,
&(node_config.id, node_config.keypair.peer_id(), db.clone()),
)
.await?;
let identity = Arc::new(Identity::from_bytes(&config.identity)?);
let config =
LibraryConfig::load_and_migrate(&config_path, &(node_config.clone(), db.clone()))
.await?;
let node_data = db
.node()
.find_unique(node::pub_id::equals(node_config.id.as_bytes().to_vec()))
let instance = db
.instance()
.find_unique(instance::id::equals(config.instance_id))
.exec()
.await?
.ok_or_else(|| LibraryManagerError::CurrentNodeNotFound(id.to_string()))?;
.ok_or_else(|| {
LibraryManagerError::CurrentInstanceNotFound(config.instance_id.to_string())
})?;
let identity = Arc::new(Identity::from_bytes(&instance.identity)?);
let instance_id = Uuid::from_slice(&instance.pub_id)?;
let curr_platform = Platform::current() as i32;
if node_data.platform != curr_platform {
let instance_node_id = Uuid::from_slice(&instance.node_id)?;
if instance_node_id != node_config.id
|| instance.node_platform != curr_platform
|| instance.node_name != node_config.name
{
info!(
"Detected change of platform for library '{}', was previously '{}' and will change to '{}'. Reconciling node data.",
id,
node_data.platform,
curr_platform
"Detected that the library '{}' has changed node from '{}' to '{}'. Reconciling node data...",
id, instance_node_id, node_config.id
);
db.node()
db.instance()
.update(
node::pub_id::equals(node_data.pub_id.clone()),
instance::id::equals(instance.id),
vec![
node::platform::set(curr_platform),
node::name::set(node_config.name.clone()),
instance::node_id::set(node_config.id.as_bytes().to_vec()),
instance::node_platform::set(curr_platform),
instance::node_name::set(node_config.name),
],
)
.exec()
.await?;
}
if node_data.name != node_config.name {
info!(
"Detected change of node name for library '{}', was previously '{}' and will change to '{}'. Reconciling node data.",
id,
node_data.name,
node_config.name,
);
db.node()
.update(
node::pub_id::equals(node_data.pub_id),
vec![node::name::set(node_config.name.clone())],
)
.exec()
.await?;
}
drop(node_config); // Let's be sure not to cause a future deadlock
// TODO: Move this reconciliation into P2P and do reconciliation of both local and remote nodes.
// let key_manager = Arc::new(KeyManager::new(vec![]).await?);
// seed_keymanager(&db, &key_manager).await?;
let (sync_manager, sync_rx) = SyncManager::new(&db, id);
let (sync_manager, sync_rx) = SyncManager::new(&db, instance_id);
Self::emit(
subscribers,
@ -467,13 +463,11 @@ impl LibraryManager {
let library = Library {
id,
local_id: node_data.id,
config,
// key_manager,
sync: Arc::new(sync_manager),
orphan_remover: OrphanRemoverActor::spawn(db.clone()),
db,
node_local_id: node_data.id,
node_context,
identity,
};
@ -483,7 +477,10 @@ impl LibraryManager {
for location in library
.db
.location()
.find_many(vec![location::node_id::equals(Some(node_data.id))])
.find_many(vec![
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
location::instance_id::equals(Some(instance.id)),
])
.exec()
.await?
{

View file

@ -54,3 +54,9 @@ impl Deref for LibraryName {
&self.0
}
}
impl From<LibraryName> for String {
fn from(name: LibraryName) -> Self {
name.0
}
}

View file

@ -25,7 +25,8 @@ pub(super) async fn check_online(
let location_path = maybe_missing(&location.path, "location.path").map(Path::new)?;
if location.node_id == Some(library.node_local_id) {
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
if location.instance_id == Some(library.config.instance_id) {
match fs::metadata(&location_path).await {
Ok(_) => {
library.location_manager().add_online(pub_id).await;
@ -139,18 +140,19 @@ pub(super) async fn handle_remove_location_request(
) {
let key = (location_id, library.id);
if let Some(location) = get_location(location_id, &library).await {
if location.node_id == Some(library.node_local_id) {
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
if location.instance_id == Some(library.config.instance_id) {
unwatch_location(location, library.id, locations_watched, locations_unwatched);
locations_unwatched.remove(&key);
forced_unwatch.remove(&key);
} else {
drop_location(
location_id,
library.id,
"Dropping location from location manager, because we don't have a `local_path` anymore",
locations_watched,
locations_unwatched
);
location_id,
library.id,
"Dropping location from location manager, because we don't have a `local_path` anymore",
locations_watched,
locations_unwatched
);
}
} else {
drop_location(

View file

@ -447,7 +447,8 @@ impl LocationManager {
// The time to check came for an already removed library, so we just ignore it
to_remove.remove(&key);
} else if let Some(location) = get_location(location_id, &library).await {
if location.node_id == Some(library.node_local_id) {
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
if location.instance_id == Some(library.config.instance_id) {
let is_online = match check_online(&location, &library).await {
Ok(is_online) => is_online,
Err(e) => {

View file

@ -139,6 +139,7 @@ impl SpacedriveLocationMetadataFile {
self.write_metadata().await
}
#[allow(dead_code)]
pub(super) async fn update(
&mut self,
library_id: LibraryId,

View file

@ -7,12 +7,9 @@ use crate::{
file_identifier::{self, file_identifier_job::FileIdentifierJobInit},
preview::{shallow_thumbnailer, thumbnailer_job::ThumbnailerJobInit},
},
prisma::{file_path, indexer_rules_in_location, location, node, PrismaClient},
prisma::{file_path, indexer_rules_in_location, location, PrismaClient},
sync,
util::{
db::{chain_optional_iter, uuid_to_bytes},
error::FileIOError,
},
util::{db::chain_optional_iter, error::FileIOError},
};
use std::{
@ -289,7 +286,8 @@ impl LocationUpdateArgs {
)
.await?;
if location.node_id == Some(library.node_local_id) {
// TODO(N): This will probs fall apart with removable media.
if location.instance_id == Some(library.config.instance_id) {
if let Some(path) = &location.path {
if let Some(mut metadata) =
SpacedriveLocationMetadataFile::try_load(path).await?
@ -372,7 +370,8 @@ pub async fn scan_location(
library: &Library,
location: location_with_indexer_rules::Data,
) -> Result<(), JobManagerError> {
if location.node_id != Some(library.node_local_id) {
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
if location.instance_id != Some(library.config.instance_id) {
return Ok(());
}
@ -405,7 +404,9 @@ pub async fn scan_location_sub_path(
sub_path: impl AsRef<Path>,
) -> Result<(), JobManagerError> {
let sub_path = sub_path.as_ref().to_path_buf();
if location.node_id != Some(library.node_local_id) {
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
if location.instance_id != Some(library.config.instance_id) {
return Ok(());
}
@ -441,7 +442,8 @@ pub async fn light_scan_location(
) -> Result<(), JobError> {
let sub_path = sub_path.as_ref().to_path_buf();
if location.node_id != Some(library.node_local_id) {
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
if location.instance_id != Some(library.config.instance_id) {
return Ok(());
}
@ -588,9 +590,9 @@ async fn create_location(
(location::path::NAME, json!(&location_path)),
(location::date_created::NAME, json!(date_created)),
(
location::node::NAME,
json!(sync::node::SyncId {
pub_id: uuid_to_bytes(library.id)
location::instance_id::NAME,
json!(sync::instance::SyncId {
id: library.config.instance_id,
}),
),
],
@ -602,7 +604,10 @@ async fn create_location(
location::name::set(Some(name.clone())),
location::path::set(Some(location_path)),
location::date_created::set(Some(date_created.into())),
location::node::connect(node::id::equals(library.node_local_id)),
location::instance_id::set(Some(library.config.instance_id)),
// location::instance::connect(instance::id::equals(
// library.config.instance_id.as_bytes().to_vec(),
// )),
],
)
.include(location_with_indexer_rules::include()),
@ -656,7 +661,10 @@ pub async fn delete_location(
.exec()
.await?;
if location.node_id == Some(library.node_local_id) {
// TODO: This should really be queued to the proper node so it will always run
// TODO: Deal with whether a location is online or not
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
if location.instance_id == Some(library.config.instance_id) {
if let Some(path) = &location.path {
if let Ok(Some(mut metadata)) = SpacedriveLocationMetadataFile::try_load(path).await {
metadata.remove_library(library.id).await?;
@ -710,7 +718,7 @@ impl From<location_with_indexer_rules::Data> for location::Data {
id: data.id,
pub_id: data.pub_id,
path: data.path,
node_id: data.node_id,
instance_id: data.instance_id,
name: data.name,
total_capacity: data.total_capacity,
available_capacity: data.available_capacity,
@ -719,9 +727,9 @@ impl From<location_with_indexer_rules::Data> for location::Data {
sync_preview_media: data.sync_preview_media,
hidden: data.hidden,
date_created: data.date_created,
node: None,
file_paths: None,
indexer_rules: None,
// instance: None,
}
}
}
@ -732,7 +740,7 @@ impl From<&location_with_indexer_rules::Data> for location::Data {
id: data.id,
pub_id: data.pub_id.clone(),
path: data.path.clone(),
node_id: data.node_id,
instance_id: data.instance_id,
name: data.name.clone(),
total_capacity: data.total_capacity,
available_capacity: data.available_capacity,
@ -741,9 +749,9 @@ impl From<&location_with_indexer_rules::Data> for location::Data {
sync_preview_media: data.sync_preview_media,
hidden: data.hidden,
date_created: data.date_created,
node: None,
file_paths: None,
indexer_rules: None,
// instance: None,
}
}
}

View file

@ -1,10 +1,13 @@
#![allow(clippy::unwrap_used, clippy::panic)] // TODO: Remove once this is fully stablised
#![allow(dead_code)] // TODO: Remove once protocol is finished
mod p2p_manager;
mod pairing;
mod peer_metadata;
mod protocol;
pub use p2p_manager::*;
pub use pairing::*;
pub use peer_metadata::*;
pub use protocol::*;

View file

@ -2,23 +2,16 @@ use std::{
borrow::Cow,
collections::HashMap,
path::PathBuf,
str::FromStr,
sync::{
atomic::{AtomicU16, Ordering},
Arc,
},
sync::Arc,
time::{Duration, Instant},
};
use chrono::Utc;
use futures::Stream;
use sd_p2p::{
spaceblock::{BlockSize, SpaceblockRequest, Transfer},
spacetime::SpaceTimeStream,
spacetunnel::{Identity, Tunnel},
Event, Manager, ManagerError, MetadataManager, PeerId,
};
use sd_prisma::prisma::node;
use sd_sync::CRDTOperation;
use serde::Serialize;
use specta::Type;
@ -32,13 +25,13 @@ use tracing::{debug, error, info, warn};
use uuid::Uuid;
use crate::{
library::{Library, LibraryManager, SubscriberEvent},
node::{NodeConfig, NodeConfigManager, Platform},
p2p::{NodeInformation, OperatingSystem, SyncRequestError, SPACEDRIVE_APP_ID},
library::{LibraryManager, SubscriberEvent},
node::{NodeConfig, NodeConfigManager},
p2p::{OperatingSystem, SPACEDRIVE_APP_ID},
sync::SyncMessage,
};
use super::{Header, PeerMetadata};
use super::{Header, PairingManager, PairingStatus, PeerMetadata};
/// The amount of time to wait for a Spacedrop request to be accepted or rejected before it's automatically rejected
const SPACEDROP_TIMEOUT: Duration = Duration::from_secs(60);
@ -56,7 +49,17 @@ pub enum P2PEvent {
peer_id: PeerId,
name: String,
},
// TODO: Expire peer + connection/disconnect
// Pairing was reuqest has come in.
// This will fire on the responder only.
PairingRequest {
id: u16,
name: String,
os: OperatingSystem,
},
PairingProgress {
id: u16,
status: PairingStatus,
}, // TODO: Expire peer + connection/disconnect
}
pub struct P2PManager {
@ -65,7 +68,7 @@ pub struct P2PManager {
spacedrop_pairing_reqs: Arc<Mutex<HashMap<Uuid, oneshot::Sender<Option<String>>>>>,
pub metadata_manager: Arc<MetadataManager<PeerMetadata>>,
pub spacedrop_progress: Arc<Mutex<HashMap<Uuid, broadcast::Sender<u8>>>>,
pairing_id: AtomicU16,
pub pairing: Arc<PairingManager>,
library_manager: Arc<LibraryManager>,
}
@ -76,7 +79,10 @@ impl P2PManager {
) -> Result<Arc<Self>, ManagerError> {
let (config, keypair) = {
let config = node_config.get().await;
(Self::config_to_metadata(&config), config.keypair)
(
Self::config_to_metadata(&config, &library_manager).await,
config.keypair,
)
};
let metadata_manager = MetadataManager::new(config);
@ -96,11 +102,13 @@ impl P2PManager {
let spacedrop_pairing_reqs = Arc::new(Mutex::new(HashMap::new()));
let spacedrop_progress = Arc::new(Mutex::new(HashMap::new()));
let pairing = PairingManager::new(manager.clone(), tx.clone(), library_manager.clone());
tokio::spawn({
let events = tx.clone();
let spacedrop_pairing_reqs = spacedrop_pairing_reqs.clone();
let spacedrop_progress = spacedrop_progress.clone();
let library_manager = library_manager.clone();
let pairing = pairing.clone();
async move {
let mut shutdown = false;
@ -124,28 +132,22 @@ impl P2PManager {
// TODO(Spacedrop): Disable Spacedrop for now
// event.dial().await;
}
Event::PeerMessage(mut event) => {
Event::PeerMessage(event) => {
let events = events.clone();
let spacedrop_pairing_reqs = spacedrop_pairing_reqs.clone();
let spacedrop_progress = spacedrop_progress.clone();
let library_manager = library_manager.clone();
let pairing = pairing.clone();
tokio::spawn(async move {
let header = Header::from_stream(&mut event.stream).await.unwrap();
let mut stream = event.stream;
let header = Header::from_stream(&mut stream).await.unwrap();
match header {
Header::Ping => {
debug!("Received ping from peer '{}'", event.peer_id);
}
Header::Spacedrop(req) => {
let mut stream = match event.stream {
SpaceTimeStream::Unicast(stream) => stream,
_ => {
// TODO: Return an error to the remote client
error!("Received Spacedrop request from peer '{}' but it's not a unicast stream!", event.peer_id);
return;
}
};
let id = Uuid::new_v4();
let (tx, rx) = oneshot::channel();
@ -201,90 +203,14 @@ impl P2PManager {
}
};
}
Header::Pair(library_id) => {
let mut stream = match event.stream {
SpaceTimeStream::Unicast(stream) => stream,
_ => {
// TODO: Return an error to the remote client
error!("Received Spacedrop request from peer '{}' but it's not a unicast stream!", event.peer_id);
return;
}
};
info!(
"Starting pairing with '{}' for library '{library_id}'",
event.peer_id
);
// TODO: Authentication and security stuff
let library =
library_manager.get_library(library_id).await.unwrap();
debug!("Waiting for nodeinfo from the remote node");
let remote_info = NodeInformation::from_stream(&mut stream)
.await
.unwrap();
debug!(
"Received nodeinfo from the remote node: {:?}",
remote_info
);
debug!("Creating node in database");
node::Create {
pub_id: remote_info.pub_id.as_bytes().to_vec(),
name: remote_info.name,
platform: remote_info.platform as i32,
date_created: Utc::now().into(),
_params: vec![
node::identity::set(Some(
remote_info.public_key.to_bytes().to_vec(),
)),
node::node_peer_id::set(Some(
event.peer_id.to_string(),
)),
],
}
// TODO: Should this be in a transaction in case it fails?
.to_query(&library.db)
.exec()
.await
.unwrap();
// TODO(@oscar): check if this should be library stuff
let info = NodeInformation {
pub_id: library.config.node_id,
name: library.config.name.to_string(),
public_key: library.identity.to_remote_identity(),
platform: Platform::current(),
};
debug!("Sending nodeinfo to the remote node");
stream.write_all(&info.to_bytes()).await.unwrap();
info!(
"Paired with '{}' for library '{library_id}'",
remote_info.pub_id
); // TODO: Use hash of identity cert here cause pub_id can be forged
Header::Pair => {
pairing.responder(event.peer_id, stream).await;
}
Header::Sync(library_id) => {
let stream = match event.stream {
SpaceTimeStream::Unicast(stream) => stream,
_ => {
// TODO: Return an error to the remote client
error!("Received Spacedrop request from peer '{}' but it's not a unicast stream!", event.peer_id);
return;
}
};
let mut stream = Tunnel::from_stream(stream).await.unwrap();
let mut len = [0; 4];
stream
.read_exact(&mut len)
.await
.map_err(SyncRequestError::PayloadLenIoError)
.unwrap();
stream.read_exact(&mut len).await.unwrap();
let len = u32::from_le_bytes(len);
let mut buf = vec![0; len as usize]; // TODO: Designed for easily being able to be DOS the current Node
@ -315,6 +241,9 @@ impl P2PManager {
}
});
}
Event::PeerBroadcast(_event) => {
// todo!();
}
Event::Shutdown => {
shutdown = true;
break;
@ -336,12 +265,12 @@ impl P2PManager {
// https://docs.rs/system_shutdown/latest/system_shutdown/
let this = Arc::new(Self {
pairing,
events: (tx, rx),
manager,
spacedrop_pairing_reqs,
metadata_manager,
spacedrop_progress,
pairing_id: AtomicU16::new(0),
library_manager: library_manager.clone(),
});
@ -378,20 +307,34 @@ impl P2PManager {
Ok(this)
}
fn config_to_metadata(config: &NodeConfig) -> PeerMetadata {
async fn config_to_metadata(
config: &NodeConfig,
library_manager: &LibraryManager,
) -> PeerMetadata {
PeerMetadata {
name: config.name.clone(),
operating_system: Some(OperatingSystem::get_os()),
version: Some(env!("CARGO_PKG_VERSION").to_string()),
email: config.p2p_email.clone(),
img_url: config.p2p_img_url.clone(),
instances: library_manager
.get_all_instances()
.await
.into_iter()
.filter_map(|i| {
Identity::from_bytes(&i.identity)
.map(|i| hex::encode(i.public_key().to_bytes()))
.ok()
})
.collect(),
}
}
#[allow(unused)] // TODO: Should probs be using this
pub async fn update_metadata(&self, node_config_manager: &NodeConfigManager) {
self.metadata_manager
.update(Self::config_to_metadata(&node_config_manager.get().await));
self.metadata_manager.update(
Self::config_to_metadata(&node_config_manager.get().await, &self.library_manager).await,
);
}
pub async fn accept_spacedrop(&self, id: Uuid, path: String) {
@ -410,63 +353,6 @@ impl P2PManager {
self.events.0.subscribe()
}
pub fn pair(&self, peer_id: PeerId, lib: Library) -> u16 {
let pairing_id = self.pairing_id.fetch_add(1, Ordering::SeqCst);
let manager = self.manager.clone();
tokio::spawn(async move {
info!(
"Started pairing session '{pairing_id}' with peer '{peer_id}' for library '{}'",
lib.id
);
let mut stream = manager.stream(peer_id).await.unwrap();
let header = Header::Pair(lib.id);
stream.write_all(&header.to_bytes()).await.unwrap();
// TODO: Apply some security here cause this is so open to MITM
// TODO: Signing and a SPAKE style pin prompt
let info = NodeInformation {
pub_id: lib.config.node_id,
name: lib.config.name.to_string(),
public_key: lib.identity.to_remote_identity(),
platform: Platform::current(),
};
debug!("Sending nodeinfo to remote node");
stream.write_all(&info.to_bytes()).await.unwrap();
debug!("Waiting for nodeinfo from the remote node");
let remote_info = NodeInformation::from_stream(&mut stream).await.unwrap();
debug!("Received nodeinfo from the remote node: {:?}", remote_info);
node::Create {
pub_id: remote_info.pub_id.as_bytes().to_vec(),
name: remote_info.name,
platform: remote_info.platform as i32,
date_created: Utc::now().into(),
_params: vec![
node::identity::set(Some(remote_info.public_key.to_bytes().to_vec())),
node::node_peer_id::set(Some(peer_id.to_string())),
],
}
// TODO: Should this be in a transaction in case it fails?
.to_query(&lib.db)
.exec()
.await
.unwrap();
info!(
"Paired with '{}' for library '{}'",
remote_info.pub_id, lib.id
); // TODO: Use hash of identity cert here cause pub_id can be forged
});
pairing_id
}
pub async fn broadcast_sync_events(
&self,
library_id: Uuid,
@ -488,35 +374,38 @@ impl P2PManager {
// TODO: Establish a connection to them
let library = self.library_manager.get_library(library_id).await.unwrap();
let _library = self.library_manager.get_library(library_id).await.unwrap();
todo!();
// TODO: probs cache this query in memory cause this is gonna be stupid frequent
let target_nodes = library
.db
.node()
.find_many(vec![])
.exec()
.await
.unwrap()
.into_iter()
.map(|n| {
PeerId::from_str(&n.node_peer_id.expect("Node was missing 'node_peer_id'!"))
.unwrap()
})
.collect::<Vec<_>>();
// let target_nodes = library
// .db
// .node()
// .find_many(vec![])
// .exec()
// .await
// .unwrap()
// .into_iter()
// .map(|n| {
// PeerId::from_str(&n.node_peer_id.expect("Node was missing 'node_peer_id'!"))
// .unwrap()
// })
// .collect::<Vec<_>>();
info!(
"Sending sync messages for library '{}' to nodes with peer id's '{:?}'",
library_id, target_nodes
);
// info!(
// "Sending sync messages for library '{}' to nodes with peer id's '{:?}'",
// library_id, target_nodes
// );
// TODO: Do in parallel
for peer_id in target_nodes {
let stream = self.manager.stream(peer_id).await.map_err(|_| ()).unwrap(); // TODO: handle providing incorrect peer id
// // TODO: Do in parallel
// for peer_id in target_nodes {
// let stream = self.manager.stream(peer_id).await.map_err(|_| ()).unwrap(); // TODO: handle providing incorrect peer id
let mut tunnel = Tunnel::from_stream(stream).await.unwrap();
// let mut tunnel = Tunnel::from_stream(stream).await.unwrap();
tunnel.write_all(&head_buf).await.unwrap();
}
// tunnel.write_all(&head_buf).await.unwrap();
// }
}
pub async fn ping(&self) {

266
core/src/p2p/pairing/mod.rs Normal file
View file

@ -0,0 +1,266 @@
use std::{
collections::HashMap,
sync::{
atomic::{AtomicU16, Ordering},
Arc, RwLock,
},
};
use chrono::Utc;
use futures::channel::oneshot;
use sd_p2p::{spacetunnel::Identity, Manager, PeerId};
use serde::{Deserialize, Serialize};
use specta::Type;
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
sync::broadcast,
};
use tracing::info;
use uuid::Uuid;
mod proto;
use proto::*;
use crate::{
library::{LibraryManager, LibraryName},
node::{NodeConfig, Platform},
p2p::Header,
};
use super::{P2PEvent, PeerMetadata};
pub struct PairingManager {
id: AtomicU16,
events_tx: broadcast::Sender<P2PEvent>,
pairing_response: RwLock<HashMap<u16, oneshot::Sender<PairingDecision>>>,
manager: Arc<Manager<PeerMetadata>>,
library_manager: Arc<LibraryManager>,
}
impl PairingManager {
pub fn new(
manager: Arc<Manager<PeerMetadata>>,
events_tx: broadcast::Sender<P2PEvent>,
library_manager: Arc<LibraryManager>,
) -> Arc<Self> {
Arc::new(Self {
id: AtomicU16::new(0),
events_tx,
pairing_response: RwLock::new(HashMap::new()),
manager,
library_manager,
})
}
fn emit_progress(&self, id: u16, status: PairingStatus) {
self.events_tx
.send(P2PEvent::PairingProgress { id, status })
.ok();
}
pub fn decision(&self, id: u16, decision: PairingDecision) {
if let Some(tx) = self.pairing_response.write().unwrap().remove(&id) {
tx.send(decision).ok();
}
}
// TODO: Error handling
pub async fn originator(self: Arc<Self>, peer_id: PeerId, node_config: NodeConfig) -> u16 {
// TODO: Timeout for max number of pairings in a time period
let pairing_id = self.id.fetch_add(1, Ordering::SeqCst);
self.emit_progress(pairing_id, PairingStatus::EstablishingConnection);
info!("Beginning pairing '{pairing_id}' as originator to remote peer '{peer_id}'");
tokio::spawn(async move {
let mut stream = self.manager.stream(peer_id).await.unwrap();
stream.write_all(&Header::Pair.to_bytes()).await.unwrap();
// TODO: Ensure both clients are on a compatible version cause Prisma model changes will cause issues
// 1. Create new instance for originator and send it to the responder
self.emit_progress(pairing_id, PairingStatus::PairingRequested);
let now = Utc::now();
let req = PairingRequest(Instance {
id: Uuid::new_v4(),
identity: Identity::new(), // TODO: Public key only
node_id: node_config.id,
node_name: node_config.name.clone(),
node_platform: Platform::current(),
last_seen: now,
date_created: now,
});
stream.write_all(&req.to_bytes()).await.unwrap();
// 2.
match PairingResponse::from_stream(&mut stream).await.unwrap() {
PairingResponse::Accepted {
library_id,
library_name,
library_description,
instances,
} => {
info!("Pairing '{pairing_id}' accepted by remote into library '{library_id}'");
// TODO: Log all instances and library info
self.emit_progress(
pairing_id,
PairingStatus::PairingInProgress {
library_name: library_name.clone(),
library_description: library_description.clone(),
},
);
// TODO: Future - Library in pairing state
// TODO: Create library
let library_config = self
.library_manager
.create_with_uuid(
library_id,
LibraryName::new(library_name).unwrap(),
library_description,
node_config,
)
.await
.unwrap();
let library = self
.library_manager
.get_library(library_config.uuid)
.await
.unwrap();
library
.db
.instance()
.create_many(instances.into_iter().map(|i| i.into()).collect())
.exec()
.await
.unwrap();
// 3.
// TODO: Either rollback or update library out of pairing state
// TODO: Fake initial sync
// TODO: Done message to frontend
self.emit_progress(pairing_id, PairingStatus::PairingComplete(library_id));
tokio::time::sleep(std::time::Duration::from_secs(30)).await; // TODO
}
PairingResponse::Rejected => {
info!("Pairing '{pairing_id}' rejected by remote");
self.emit_progress(pairing_id, PairingStatus::PairingRejected);
}
}
});
pairing_id
}
pub async fn responder(
self: Arc<Self>,
peer_id: PeerId,
mut stream: impl AsyncRead + AsyncWrite + Unpin,
) {
let pairing_id = self.id.fetch_add(1, Ordering::SeqCst);
self.emit_progress(pairing_id, PairingStatus::EstablishingConnection);
info!("Beginning pairing '{pairing_id}' as responder to remote peer '{peer_id}'");
// let inner = || async move {
let remote_instance = PairingRequest::from_stream(&mut stream).await.unwrap().0;
self.emit_progress(pairing_id, PairingStatus::PairingDecisionRequest);
self.events_tx
.send(P2PEvent::PairingRequest {
id: pairing_id,
name: remote_instance.node_name,
os: remote_instance.node_platform.into(),
})
.ok();
// Prompt the user and wait
// TODO: After 1 minute remove channel from map and assume it was rejected
let (tx, rx) = oneshot::channel();
self.pairing_response
.write()
.unwrap()
.insert(pairing_id, tx);
let PairingDecision::Accept(library_id) = rx.await.unwrap() else {
info!("The user rejected pairing '{pairing_id}'!");
// self.emit_progress(pairing_id, PairingStatus::PairingRejected); // TODO: Event to remove from frontend index
stream.write_all(&PairingResponse::Rejected.to_bytes()).await.unwrap();
return;
};
info!("The user accepted pairing '{pairing_id}' for library '{library_id}'!");
let library = self.library_manager.get_library(library_id).await.unwrap();
stream
.write_all(
&PairingResponse::Accepted {
library_id: library.id,
library_name: library.config.name.into(),
library_description: library.config.description.clone(),
instances: library
.db
.instance()
.find_many(vec![])
.exec()
.await
.unwrap()
.into_iter()
.map(|i| Instance {
id: Uuid::from_slice(&i.pub_id).unwrap(),
// TODO: If `i.identity` contains a public/private keypair replace it with the public key
identity: Identity::from_bytes(&i.identity).unwrap(),
node_id: Uuid::from_slice(&i.node_id).unwrap(),
node_name: i.node_name,
node_platform: Platform::try_from(i.node_platform as u8)
.unwrap_or(Platform::Unknown),
last_seen: i.last_seen.into(),
date_created: i.date_created.into(),
})
.collect(),
}
.to_bytes(),
)
.await
.unwrap();
// TODO: Pairing confirmation + rollback
stream.flush().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(30)).await; // TODO
// };
// inner().await.unwrap();
}
}
#[derive(Debug, Type, Serialize, Deserialize)]
#[serde(tag = "decision", content = "libraryId", rename_all = "camelCase")]
pub enum PairingDecision {
Accept(Uuid),
Reject,
}
#[derive(Debug, Hash, Clone, Serialize, Type)]
#[serde(tag = "type", content = "data")]
pub enum PairingStatus {
EstablishingConnection,
PairingRequested,
PairingDecisionRequest,
PairingInProgress {
library_name: String,
library_description: Option<String>,
},
InitialSyncProgress(u8),
PairingComplete(Uuid),
PairingRejected,
}
// TODO: Unit tests

View file

@ -0,0 +1,302 @@
use std::str::FromStr;
use chrono::{DateTime, Utc};
use sd_p2p::{
proto::{decode, encode},
spacetunnel::Identity,
};
use sd_prisma::prisma::instance;
use tokio::io::{AsyncRead, AsyncReadExt};
use uuid::Uuid;
use crate::node::Platform;
/// Terminology:
/// Instance - DB model which represents a single `.db` file.
/// Originator - begins the pairing process and is asking to join a library that will be selected by the responder.
/// Responder - is in-charge of accepting or rejecting the originator's request and then selecting which library to "share".
/// A modified version of `prisma::instance::Data` that uses proper validated types for the fields.
#[derive(Debug, PartialEq)]
pub struct Instance {
pub id: Uuid,
pub identity: Identity,
pub node_id: Uuid,
pub node_name: String,
pub node_platform: Platform,
pub last_seen: DateTime<Utc>,
pub date_created: DateTime<Utc>,
}
impl From<Instance> for instance::CreateUnchecked {
fn from(i: Instance) -> Self {
Self {
pub_id: i.id.as_bytes().to_vec(),
identity: i.identity.to_bytes(),
node_id: i.node_id.as_bytes().to_vec(),
node_name: i.node_name,
node_platform: i.node_platform as i32,
last_seen: i.last_seen.into(),
date_created: i.date_created.into(),
_params: vec![],
}
}
}
/// 1. Request for pairing to a library that is owned and will be selected by the responder.
/// Sent `Originator` -> `Responder`.
#[derive(Debug, PartialEq)]
pub struct PairingRequest(/* Originator's instance */ pub Instance);
/// 2. Decision for whether pairing was accepted or rejected once a library is decided on by the user.
/// Sent `Responder` -> `Originator`.
#[derive(Debug, PartialEq)]
pub enum PairingResponse {
/// Pairing was accepted and the responder chose the library of their we are pairing to.
Accepted {
// Library information
library_id: Uuid,
library_name: String,
library_description: Option<String>,
// All instances in the library
// Copying these means we are instantly paired with everyone else that is already in the library
// NOTE: It's super important the `identity` field is converted from a private key to a public key before sending!!!
instances: Vec<Instance>,
},
// Process will terminate as the user doesn't want to pair
Rejected,
}
/// 3. Tell the responder that the database was correctly paired.
/// Sent `Originator` -> `Responder`.
#[derive(Debug, PartialEq)]
pub enum PairingConfirmation {
Ok,
Error,
}
impl Instance {
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
) -> Result<Self, (&'static str, decode::Error)> {
Ok(Self {
id: decode::uuid(stream).await.map_err(|e| ("id", e))?,
identity: Identity::from_bytes(
&decode::buf(stream).await.map_err(|e| ("identity", e))?,
)
.unwrap(), // TODO: Error handling
node_id: decode::uuid(stream).await.map_err(|e| ("node_id", e))?,
node_name: decode::string(stream).await.map_err(|e| ("node_name", e))?,
node_platform: stream
.read_u8()
.await
.map(|b| Platform::try_from(b).unwrap_or(Platform::Unknown))
.map_err(|e| ("node_platform", e.into()))?,
last_seen: DateTime::<Utc>::from_str(
&decode::string(stream).await.map_err(|e| ("last_seen", e))?,
)
.unwrap(), // TODO: Error handling
date_created: DateTime::<Utc>::from_str(
&decode::string(stream)
.await
.map_err(|e| ("date_created", e))?,
)
.unwrap(), // TODO: Error handling
})
}
pub fn to_bytes(&self) -> Vec<u8> {
let Self {
id,
identity,
node_id,
node_name,
node_platform,
last_seen,
date_created,
} = self;
let mut buf = Vec::new();
encode::uuid(&mut buf, id);
encode::buf(&mut buf, &identity.to_bytes());
encode::uuid(&mut buf, node_id);
encode::string(&mut buf, node_name);
buf.push(*node_platform as u8);
encode::string(&mut buf, &last_seen.to_string());
encode::string(&mut buf, &date_created.to_string());
buf
}
}
impl PairingRequest {
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
) -> Result<Self, (&'static str, decode::Error)> {
Ok(Self(Instance::from_stream(stream).await?))
}
pub fn to_bytes(&self) -> Vec<u8> {
let Self(instance) = self;
Instance::to_bytes(instance)
}
}
impl PairingResponse {
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
) -> Result<Self, (&'static str, decode::Error)> {
// TODO: Error handling
match stream.read_u8().await.unwrap() {
0 => Ok(Self::Accepted {
library_id: decode::uuid(stream).await.map_err(|e| ("library_id", e))?,
library_name: decode::string(stream)
.await
.map_err(|e| ("library_name", e))?,
library_description: match decode::string(stream)
.await
.map_err(|e| ("library_description", e))?
{
s if s.is_empty() => None,
s => Some(s),
},
instances: {
let len = stream.read_u16_le().await.unwrap();
let mut instances = Vec::with_capacity(len as usize); // TODO: Prevent DOS
for _ in 0..len {
instances.push(Instance::from_stream(stream).await.unwrap());
}
instances
},
}),
1 => Ok(Self::Rejected),
_ => todo!(),
}
}
pub fn to_bytes(&self) -> Vec<u8> {
match self {
Self::Accepted {
library_id,
library_name,
library_description,
instances,
} => {
let mut buf = vec![0];
encode::uuid(&mut buf, library_id);
encode::string(&mut buf, library_name);
encode::string(&mut buf, library_description.as_deref().unwrap_or(""));
buf.extend((instances.len() as u16).to_le_bytes());
for instance in instances {
buf.extend(instance.to_bytes());
}
buf
}
Self::Rejected => vec![1],
}
}
}
impl PairingConfirmation {
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
) -> Result<Self, (&'static str, decode::Error)> {
// TODO: Error handling
match stream.read_u8().await.unwrap() {
0 => Ok(Self::Ok),
1 => Ok(Self::Error),
_ => {
todo!();
}
}
}
pub fn to_bytes(&self) -> Vec<u8> {
match self {
Self::Ok => vec![0],
Self::Error => vec![1],
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_types() {
let instance = || Instance {
id: Uuid::new_v4(),
identity: Identity::new(),
node_id: Uuid::new_v4(),
node_name: "Node Name".into(),
node_platform: Platform::current(),
last_seen: Utc::now().into(),
date_created: Utc::now().into(),
};
{
let original = PairingRequest(instance());
let mut cursor = std::io::Cursor::new(original.to_bytes());
let result = PairingRequest::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, result);
}
{
let original = PairingResponse::Accepted {
library_id: Uuid::new_v4(),
library_name: "Library Name".into(),
library_description: Some("Library Description".into()),
instances: vec![instance(), instance(), instance()],
};
let mut cursor = std::io::Cursor::new(original.to_bytes());
let result = PairingResponse::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, result);
}
{
let original = PairingResponse::Accepted {
library_id: Uuid::new_v4(),
library_name: "Library Name".into(),
library_description: None,
instances: vec![],
};
let mut cursor = std::io::Cursor::new(original.to_bytes());
let result = PairingResponse::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, result);
}
{
let original = PairingResponse::Rejected;
let mut cursor = std::io::Cursor::new(original.to_bytes());
let result = PairingResponse::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, result);
}
{
let original = PairingConfirmation::Ok;
let mut cursor = std::io::Cursor::new(original.to_bytes());
let result = PairingConfirmation::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, result);
}
{
let original = PairingConfirmation::Error;
let mut cursor = std::io::Cursor::new(original.to_bytes());
let result = PairingConfirmation::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, result);
}
}
}

View file

@ -1,9 +1,12 @@
use std::{collections::HashMap, env, str::FromStr};
use itertools::Itertools;
use sd_p2p::Metadata;
use serde::{Deserialize, Serialize};
use specta::Type;
use crate::node::Platform;
#[derive(Debug, Clone, Type, Serialize, Deserialize)]
pub struct PeerMetadata {
pub(super) name: String,
@ -11,11 +14,12 @@ pub struct PeerMetadata {
pub(super) version: Option<String>,
pub(super) email: Option<String>,
pub(super) img_url: Option<String>,
pub(super) instances: Vec<String>,
}
impl Metadata for PeerMetadata {
fn to_hashmap(self) -> HashMap<String, String> {
let mut map = HashMap::with_capacity(3);
let mut map = HashMap::with_capacity(5);
map.insert("name".to_owned(), self.name);
if let Some(os) = self.operating_system {
map.insert("os".to_owned(), os.to_string());
@ -29,6 +33,7 @@ impl Metadata for PeerMetadata {
if let Some(img_url) = self.img_url {
map.insert("img_url".to_owned(), img_url);
}
map.insert("instances".to_owned(), self.instances.into_iter().join(","));
map
}
@ -51,6 +56,15 @@ impl Metadata for PeerMetadata {
version: data.get("version").map(|v| v.to_owned()),
email: data.get("email").map(|v| v.to_owned()),
img_url: data.get("img_url").map(|v| v.to_owned()),
instances: data
.get("instances")
.ok_or_else(|| {
"DNS record for field 'instances' missing. Unable to decode 'PeerMetadata'!"
.to_owned()
})?
.split(',')
.map(|s| s.parse().map_err(|_| "Unable to parse instance 'Uuid'!"))
.collect::<Result<Vec<_>, _>>()?,
})
}
}
@ -67,6 +81,20 @@ pub enum OperatingSystem {
Other(String),
}
// TODO: Should `Platform` and `OperatingSystem` be merged into one?
impl From<Platform> for OperatingSystem {
fn from(platform: Platform) -> Self {
match platform {
Platform::Unknown => OperatingSystem::Other("Unknown".into()),
Platform::Windows => OperatingSystem::Windows,
Platform::Linux => OperatingSystem::Linux,
Platform::MacOS => OperatingSystem::MacOS,
Platform::IOS => OperatingSystem::Ios,
Platform::Android => OperatingSystem::Android,
}
}
}
impl OperatingSystem {
pub fn get_os() -> Self {
match env::consts::OS {

View file

@ -1,88 +1,54 @@
use std::string::FromUtf8Error;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio::io::AsyncReadExt;
use uuid::Uuid;
use sd_p2p::{
proto::{decode, encode},
spaceblock::{SpaceblockRequest, SpacedropRequestError},
spacetime::SpaceTimeStream,
spacetunnel::{IdentityErr, RemoteIdentity},
spacetime::UnicastStream,
};
use crate::node::Platform;
/// TODO
#[derive(Debug, PartialEq, Eq)]
pub enum Header {
// TODO: Split out cause this is a broadcast
Ping,
Spacedrop(SpaceblockRequest),
Pair(Uuid),
Pair,
Sync(Uuid),
}
#[derive(Debug, Error)]
pub enum SyncRequestError {
#[error("io error reading library id: {0}")]
LibraryIdIoError(std::io::Error),
#[error("io error decoding library id: {0}")]
ErrorDecodingLibraryId(uuid::Error),
#[error("io error reading sync payload len: {0}")]
PayloadLenIoError(std::io::Error),
}
#[derive(Debug, Error)]
pub enum HeaderError {
#[error("io error reading discriminator: {0}")]
DiscriminatorIoError(std::io::Error),
DiscriminatorIo(std::io::Error),
#[error("invalid discriminator '{0}'")]
InvalidDiscriminator(u8),
DiscriminatorInvalid(u8),
#[error("error reading spacedrop request: {0}")]
SpacedropRequestError(#[from] SpacedropRequestError),
SpacedropRequest(#[from] SpacedropRequestError),
#[error("error reading sync request: {0}")]
SyncRequestError(#[from] SyncRequestError),
#[error("invalid request. Spacedrop requires a unicast stream!")]
SpacedropOverMulticastIsForbidden,
SyncRequest(decode::Error),
}
impl Header {
pub async fn from_stream(stream: &mut SpaceTimeStream) -> Result<Self, HeaderError> {
pub async fn from_stream(stream: &mut UnicastStream) -> Result<Self, HeaderError> {
let discriminator = stream
.read_u8()
.await
.map_err(HeaderError::DiscriminatorIoError)?;
.map_err(HeaderError::DiscriminatorIo)?;
match discriminator {
0 => match stream {
SpaceTimeStream::Unicast(stream) => Ok(Self::Spacedrop(
SpaceblockRequest::from_stream(stream).await?,
)),
_ => Err(HeaderError::SpacedropOverMulticastIsForbidden),
},
0 => Ok(Self::Spacedrop(
SpaceblockRequest::from_stream(stream).await?,
)),
1 => Ok(Self::Ping),
2 => {
let mut uuid = [0u8; 16];
stream
.read_exact(&mut uuid)
2 => Ok(Self::Pair),
3 => Ok(Self::Sync(
decode::uuid(stream)
.await
.map_err(SyncRequestError::LibraryIdIoError)?;
Ok(Self::Pair(
Uuid::from_slice(&uuid).map_err(SyncRequestError::ErrorDecodingLibraryId)?,
))
}
3 => {
let mut uuid = [0u8; 16];
stream
.read_exact(&mut uuid)
.await
.map_err(SyncRequestError::LibraryIdIoError)?;
Ok(Self::Sync(
Uuid::from_slice(&uuid).map_err(SyncRequestError::ErrorDecodingLibraryId)?,
))
}
d => Err(HeaderError::InvalidDiscriminator(d)),
.map_err(HeaderError::SyncRequest)?,
)),
d => Err(HeaderError::DiscriminatorInvalid(d)),
}
}
@ -94,176 +60,38 @@ impl Header {
bytes
}
Self::Ping => vec![1],
Self::Pair(library_id) => {
let mut bytes = vec![2];
bytes.extend_from_slice(library_id.as_bytes());
bytes
}
Self::Pair => vec![2],
Self::Sync(uuid) => {
let mut bytes = vec![3];
bytes.extend_from_slice(uuid.as_bytes());
encode::uuid(&mut bytes, uuid);
bytes
}
}
}
}
#[derive(Debug, Error)]
pub enum NodeInformationError {
#[error("io error decoding node information library pub_id: {0}")]
ErrorDecodingUuid(std::io::Error),
#[error("error formatting node information library pub_id: {0}")]
UuidFormatError(uuid::Error),
#[error("io error reading node information library name length: {0}")]
NameLenIoError(std::io::Error),
#[error("io error decoding node information library name: {0}")]
ErrorDecodingName(std::io::Error),
#[error("error formatting node information library name: {0}")]
NameFormatError(FromUtf8Error),
#[error("io error reading node information public key length: {0}")]
PublicKeyLenIoError(std::io::Error),
#[error("io error decoding node information public key: {0}")]
ErrorDecodingPublicKey(std::io::Error),
#[error("error decoding public key: {0}")]
ErrorParsingPublicKey(#[from] IdentityErr),
#[error("io error reading node information platform id: {0}")]
PlatformIdError(std::io::Error),
}
/// is shared between nodes during pairing and contains the information to identify the node.
#[derive(Debug, PartialEq, Eq)]
pub struct NodeInformation {
pub pub_id: Uuid,
pub name: String,
pub public_key: RemoteIdentity,
pub platform: Platform,
}
impl NodeInformation {
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
) -> Result<Self, NodeInformationError> {
let pub_id = {
let mut buf = vec![0u8; 16];
stream
.read_exact(&mut buf)
.await
.map_err(NodeInformationError::ErrorDecodingUuid)?;
Uuid::from_slice(&buf).map_err(NodeInformationError::UuidFormatError)?
};
let name = {
let len = stream
.read_u16_le()
.await
.map_err(NodeInformationError::NameLenIoError)?;
let mut buf = vec![0u8; len as usize];
stream
.read_exact(&mut buf)
.await
.map_err(NodeInformationError::ErrorDecodingName)?;
String::from_utf8(buf).map_err(NodeInformationError::NameFormatError)?
};
let public_key = {
let len = stream
.read_u16_le()
.await
.map_err(NodeInformationError::PublicKeyLenIoError)?;
let mut buf = vec![0u8; len as usize];
stream
.read_exact(&mut buf)
.await
.map_err(NodeInformationError::ErrorDecodingPublicKey)?;
RemoteIdentity::from_bytes(&buf)?
};
let platform = stream
.read_u8()
.await
.map_err(NodeInformationError::PlatformIdError)?;
Ok(Self {
pub_id,
name,
public_key,
platform: Platform::try_from(platform).unwrap_or(Platform::Unknown),
})
}
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::new();
// Pub id
buf.extend(self.pub_id.as_bytes());
// Name
let len_buf = (self.name.len() as u16).to_le_bytes();
if self.name.len() > u16::MAX as usize {
panic!("Name is too long!"); // TODO: Error handling
}
buf.extend_from_slice(&len_buf);
buf.extend(self.name.as_bytes());
// Public key // TODO: Can I use a fixed size array?
let pk = self.public_key.to_bytes();
let len_buf = (pk.len() as u16).to_le_bytes();
if pk.len() > u16::MAX as usize {
panic!("Public key is too long!"); // TODO: Error handling
}
buf.extend_from_slice(&len_buf);
buf.extend(pk);
// Platform
buf.push(self.platform as u8);
buf
}
}
#[cfg(test)]
mod tests {
use super::*;
use sd_p2p::spacetunnel::Identity;
// use super::*;
#[tokio::test]
async fn test_node_information() {
let original = NodeInformation {
pub_id: Uuid::new_v4(),
name: "Name".into(),
public_key: Identity::new().to_remote_identity(),
platform: Platform::current(),
};
#[test]
fn test_header() {
// TODO: Finish this
let buf = original.to_bytes();
let mut cursor = std::io::Cursor::new(buf);
let info = NodeInformation::from_stream(&mut cursor).await.unwrap();
// assert_eq!(
// Header::from_bytes(&Header::Ping.to_bytes()),
// Ok(Header::Ping)
// );
assert_eq!(original, info);
// assert_eq!(
// Header::from_bytes(&Header::Spacedrop.to_bytes()),
// Ok(Header::Spacedrop)
// );
// let uuid = Uuid::new_v4();
// assert_eq!(
// Header::from_bytes(&Header::Sync(uuid).to_bytes()),
// Ok(Header::Sync(uuid))
// );
}
// TODO: Unit test it because binary protocols are error prone
// #[test]
// fn test_proto() {
// assert_eq!(
// Header::from_bytes(&Header::Ping.to_bytes()),
// Ok(Header::Ping)
// );
// assert_eq!(
// Header::from_bytes(&Header::Spacedrop.to_bytes()),
// Ok(Header::Spacedrop)
// );
// let uuid = Uuid::new_v4();
// assert_eq!(
// Header::from_bytes(&Header::Sync(uuid).to_bytes()),
// Ok(Header::Sync(uuid))
// );
// }
}

View file

@ -0,0 +1,5 @@
// TODO: Define JSON file + open/close it
// TODO: Define messages for process + SPAKE part
// TODO: How is this gonna hook into the frontend?

View file

@ -25,7 +25,7 @@ impl LibraryPreferences {
pub async fn write(self, db: &PrismaClient) -> prisma_client_rust::Result<()> {
let kvs = self.to_kvs();
db._batch(kvs.to_upserts(&db)).await?;
db._batch(kvs.to_upserts(db)).await?;
Ok(())
}

View file

@ -21,21 +21,21 @@ pub enum SyncMessage {
pub struct SyncManager {
db: Arc<PrismaClient>,
node: Uuid,
instance: Uuid,
_clocks: HashMap<Uuid, NTP64>,
clock: HLC,
pub tx: Sender<SyncMessage>,
}
impl SyncManager {
pub fn new(db: &Arc<PrismaClient>, node: Uuid) -> (Self, Receiver<SyncMessage>) {
pub fn new(db: &Arc<PrismaClient>, instance: Uuid) -> (Self, Receiver<SyncMessage>) {
let (tx, rx) = broadcast::channel(64);
(
Self {
db: db.clone(),
node,
clock: HLCBuilder::new().with_id(node.into()).build(),
instance,
clock: HLCBuilder::new().with_id(instance.into()).build(),
_clocks: Default::default(),
tx,
},
@ -67,7 +67,7 @@ impl SyncManager {
to_vec(&shared_op.record_id).unwrap(),
kind.to_string(),
to_vec(&shared_op.data).unwrap(),
node::pub_id::equals(op.node.as_bytes().to_vec()),
instance::pub_id::equals(op.instance.as_bytes().to_vec()),
vec![],
))
}
@ -114,7 +114,7 @@ impl SyncManager {
to_vec(&shared_op.record_id).unwrap(),
kind.to_string(),
to_vec(&shared_op.data).unwrap(),
node::pub_id::equals(op.node.as_bytes().to_vec()),
instance::pub_id::equals(op.instance.as_bytes().to_vec()),
vec![],
),
query,
@ -141,7 +141,7 @@ impl SyncManager {
.shared_operation()
.find_many(vec![])
.order_by(shared_operation::timestamp::order(SortOrder::Asc))
.include(shared_operation::include!({ node: select {
.include(shared_operation::include!({ instance: select {
pub_id
} }))
.exec()
@ -150,7 +150,7 @@ impl SyncManager {
.flat_map(|op| {
Some(CRDTOperation {
id: Uuid::from_slice(&op.id).ok()?,
node: Uuid::from_slice(&op.node.pub_id).ok()?,
instance: Uuid::from_slice(&op.instance.pub_id).ok()?,
timestamp: NTP64(op.timestamp as u64),
typ: CRDTOperationType::Shared(SharedOperation {
record_id: serde_json::from_slice(&op.record_id).ok()?,
@ -166,8 +166,8 @@ impl SyncManager {
let db = &self.db;
if db
.node()
.find_unique(node::pub_id::equals(op.node.as_bytes().to_vec()))
.instance()
.find_unique(instance::pub_id::equals(op.instance.as_bytes().to_vec()))
.exec()
.await?
.is_none()
@ -321,7 +321,7 @@ impl SyncManager {
to_vec(&shared_op.record_id).unwrap(),
kind.to_string(),
to_vec(&shared_op.data).unwrap(),
node::pub_id::equals(op.node.as_bytes().to_vec()),
instance::pub_id::equals(op.instance.as_bytes().to_vec()),
vec![],
)
.exec()
@ -337,7 +337,7 @@ impl SyncManager {
let timestamp = self.clock.new_timestamp();
CRDTOperation {
node: self.node,
instance: self.instance,
timestamp: *timestamp.get_time(),
id: Uuid::new_v4(),
typ,

View file

@ -8,7 +8,7 @@ use std::{
use crate::{
job::JobManagerError,
library::{LibraryConfig, LibraryManagerError, LibraryName},
library::{LibraryManagerError, LibraryName},
location::{
delete_location, scan_location, LocationCreateArgs, LocationError, LocationManagerError,
},
@ -17,7 +17,6 @@ use crate::{
util::AbortOnDrop,
};
use prisma_client_rust::QueryError;
use sd_p2p::spacetunnel::Identity;
use serde::Deserialize;
use thiserror::Error;
use tokio::{
@ -114,18 +113,8 @@ impl InitConfig {
let library = match library_manager.get_library(lib.id).await {
Some(lib) => lib,
None => {
let node_pub_id = Uuid::new_v4();
let library = library_manager
.create_with_uuid(
lib.id,
LibraryConfig {
name: lib.name,
description: lib.description,
identity: Identity::new().to_bytes(),
node_id: node_pub_id,
},
node_cfg.clone(),
)
.create_with_uuid(lib.id, lib.name, lib.description, node_cfg.clone())
.await?;
match library_manager.get_library(library.uuid).await {

View file

@ -1,7 +1,7 @@
use std::{
any::type_name,
fs::File,
io::{self, BufReader, Seek, Write},
io::{self, Seek, Write},
path::{Path, PathBuf},
};
@ -41,31 +41,22 @@ pub trait Migrate: Sized + DeserializeOwned + Serialize {
async fn load_and_migrate(path: &Path, ctx: &Self::Ctx) -> Result<Self, MigratorError> {
match path.try_exists()? {
true => {
let mut file = File::options().read(true).write(true).open(path)?;
let mut cfg: BaseConfig = match serde_json::from_reader(BufReader::new(&mut file)) {
let mut file = File::options().write(true).read(true).open(path)?;
let mut cfg: BaseConfig = match serde_json::from_reader(&mut file) {
Ok(cfg) => cfg,
Err(err) => {
// This is for backwards compatibility for the backwards compatibility cause the super super old system store the version as a string.
{
file.rewind()?;
let mut y = match serde_json::from_reader::<_, Value>(BufReader::new(
&mut file,
)) {
Ok(y) => y,
Err(_) => {
return Err(err.into());
}
};
file.rewind()?;
let mut cfg = serde_json::from_reader::<_, Value>(file)?;
if let Some(obj) = y.as_object_mut() {
if obj.contains_key("version") {
return Err(MigratorError::HasSuperLegacyConfig); // This is just to make the error nicer
} else {
return Err(err.into());
}
if let Some(obj) = cfg.as_object_mut() {
if obj.contains_key("version") {
return Err(MigratorError::HasSuperLegacyConfig); // This is just to make the error nicer
} else {
return Err(err.into());
}
} else {
return Err(err.into());
}
}
};
@ -81,6 +72,7 @@ pub trait Migrate: Sized + DeserializeOwned + Serialize {
match Self::migrate(v, &mut cfg.other, ctx).await {
Ok(()) => (),
Err(err) => {
file.set_len(0)?; // Truncate the file
file.write_all(serde_json::to_string(&cfg)?.as_bytes())?; // Writes updated version
return Err(err);
}
@ -88,6 +80,7 @@ pub trait Migrate: Sized + DeserializeOwned + Serialize {
}
if !is_latest {
file.set_len(0)?; // Truncate the file
file.write_all(serde_json::to_string(&cfg)?.as_bytes())?; // Writes updated version
}

View file

@ -5,7 +5,9 @@ pub mod debug_initializer;
pub mod error;
mod maybe_undefined;
pub mod migrator;
mod observable;
pub mod version_manager;
pub use abort_on_drop::*;
pub use maybe_undefined::*;
pub use observable::*;

101
core/src/util/observable.rs Normal file
View file

@ -0,0 +1,101 @@
#![allow(dead_code)]
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
ops::{Deref, DerefMut},
};
use tokio::sync::{Notify, RwLock, RwLockReadGuard, RwLockWriteGuard};
/// A simple JS-style observable in Rust
pub struct Observable<T> {
t: RwLock<T>,
notify: Notify,
}
impl<T> Observable<T>
where
T: Hash,
{
pub fn new(t: T) -> Self {
Self {
t: RwLock::new(t),
notify: Notify::new(),
}
}
pub async fn get_mut(&self) -> ObservableRef<'_, T> {
let t = self.t.write().await;
ObservableRef {
start_hash: {
let mut s = DefaultHasher::new();
t.hash(&mut s);
s.finish()
},
t,
notify: &self.notify,
}
}
pub async fn set(&self, t: T) {
*self.get_mut().await = t;
}
pub async fn get(&self) -> RwLockReadGuard<'_, T> {
self.t.read().await
}
/// Wait until the value changes, then return the new value
pub async fn wait(&self) -> T
where
T: Clone,
{
self.notify.notified().await;
self.t.read().await.clone()
}
}
pub struct ObservableRef<'a, T>
where
T: Hash,
{
t: RwLockWriteGuard<'a, T>,
notify: &'a Notify,
start_hash: u64,
}
impl<T> Deref for ObservableRef<'_, T>
where
T: Hash,
{
type Target = T;
fn deref(&self) -> &Self::Target {
&self.t
}
}
impl<T> DerefMut for ObservableRef<'_, T>
where
T: Hash,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.t
}
}
impl<T> Drop for ObservableRef<'_, T>
where
T: Hash,
{
fn drop(&mut self) {
let mut s = DefaultHasher::new();
self.t.hash(&mut s);
if self.start_hash != s.finish() {
self.notify.notify_waiters();
}
}
}

View file

@ -35,6 +35,7 @@ arc-swap = "1.6.0"
p384 = { version = "0.13.0", feature = ["ecdh"] }
ed25519-dalek = { version = "1.0.1", features = ["rand"] }
rand_core = { version = "0.5.1", feature = ["getrandom"] }
uuid = "1.4.0"
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread"] }

View file

@ -1,6 +1,6 @@
use std::{collections::HashMap, env, time::Duration};
use sd_p2p::{spacetime::SpaceTimeStream, Event, Keypair, Manager, Metadata, MetadataManager};
use sd_p2p::{Event, Keypair, Manager, Metadata, MetadataManager};
use tokio::{io::AsyncReadExt, time::sleep};
use tracing::{debug, error, info};
@ -70,28 +70,25 @@ async fn main() {
);
event.dial().await; // We connect to everyone we find on the network. Your app will probs wanna restrict this!
}
Event::PeerMessage(event) => {
debug!("Peer '{}' established stream", event.peer_id);
Event::PeerMessage(mut event) => {
debug!("Peer '{}' established unicast stream", event.peer_id);
tokio::spawn(async move {
match event.stream {
SpaceTimeStream::Broadcast(mut stream) => {
let mut buf = [0; 100];
let n = stream.read(&mut buf).await.unwrap();
println!(
"GOT BROADCAST: {:?}",
std::str::from_utf8(&buf[..n]).unwrap()
);
}
SpaceTimeStream::Unicast(mut stream) => {
let mut buf = [0; 100];
let n = stream.read(&mut buf).await.unwrap();
println!(
"GOT UNICAST: {:?}",
std::str::from_utf8(&buf[..n]).unwrap()
);
}
}
let mut buf = [0; 100];
let n = event.stream.read(&mut buf).await.unwrap();
println!("GOT UNICAST: {:?}", std::str::from_utf8(&buf[..n]).unwrap());
});
}
Event::PeerBroadcast(mut event) => {
debug!("Peer '{}' established broadcast stream", event.peer_id);
tokio::spawn(async move {
let mut buf = [0; 100];
let n = event.stream.read(&mut buf).await.unwrap();
println!(
"GOT BROADCAST: {:?}",
std::str::from_utf8(&buf[..n]).unwrap()
);
});
}
Event::Shutdown => {

View file

@ -0,0 +1,5 @@
mod mdns;
mod metadata_manager;
pub use mdns::*;
pub use metadata_manager::*;

View file

@ -1,6 +1,9 @@
use std::{net::SocketAddr, sync::Arc};
use crate::{spacetime::SpaceTimeStream, ConnectedPeer, DiscoveredPeer, Manager, Metadata};
use crate::{
spacetime::{BroadcastStream, UnicastStream},
ConnectedPeer, DiscoveredPeer, Manager, Metadata,
};
use super::PeerId;
@ -8,9 +11,6 @@ use super::PeerId;
/// This is useful for updating your UI when stuff changes on the backend.
/// You can also interact with some events to cause an event.
#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
#[cfg_attr(feature = "specta", derive(specta::Type))]
#[cfg_attr(feature = "serde", serde(tag = "type"))]
pub enum Event<TMetadata: Metadata> {
/// add a network interface on this node to listen for
AddListenAddr(SocketAddr),
@ -29,25 +29,32 @@ pub enum Event<TMetadata: Metadata> {
PeerConnected(ConnectedPeer),
/// communication was lost with a peer.
PeerDisconnected(PeerId),
/// the peer has opened a new substream
#[cfg_attr(any(feature = "serde", feature = "specta"), serde(skip))]
PeerMessage(PeerMessageEvent<TMetadata>),
/// the peer has opened a new unicast substream
PeerMessage(PeerMessageEvent<TMetadata, UnicastStream>),
/// the peer has opened a new brodcast substream
PeerBroadcast(PeerMessageEvent<TMetadata, BroadcastStream>),
/// the node is shutting down
Shutdown,
}
#[derive(Debug)]
pub struct PeerMessageEvent<TMetadata: Metadata> {
pub struct PeerMessageEvent<TMetadata: Metadata, S> {
pub stream_id: u64,
pub peer_id: PeerId,
pub manager: Arc<Manager<TMetadata>>,
pub stream: SpaceTimeStream,
pub stream: S,
// Prevent manual creation by end-user
pub(crate) _priv: (),
}
impl<TMetadata: Metadata> From<PeerMessageEvent<TMetadata>> for Event<TMetadata> {
fn from(event: PeerMessageEvent<TMetadata>) -> Self {
impl<TMetadata: Metadata> From<PeerMessageEvent<TMetadata, UnicastStream>> for Event<TMetadata> {
fn from(event: PeerMessageEvent<TMetadata, UnicastStream>) -> Self {
Self::PeerMessage(event)
}
}
impl<TMetadata: Metadata> From<PeerMessageEvent<TMetadata, BroadcastStream>> for Event<TMetadata> {
fn from(event: PeerMessageEvent<TMetadata, BroadcastStream>) -> Self {
Self::PeerBroadcast(event)
}
}

View file

@ -1,20 +1,19 @@
//! Rust Peer to Peer Networking Library
mod discovery;
mod event;
mod manager;
mod manager_stream;
mod mdns;
mod metadata_manager;
mod peer;
pub mod proto;
pub mod spaceblock;
pub mod spacetime;
pub mod spacetunnel;
mod utils;
pub use discovery::*;
pub use event::*;
pub use manager::*;
pub use manager_stream::*;
pub use mdns::*;
pub use metadata_manager::*;
pub use peer::*;
pub use utils::*;

View file

@ -130,6 +130,7 @@ impl<TMetadata: Metadata> Manager<TMetadata> {
})
}
// TODO: Does this need any timeouts to be added cause hanging forever is bad?
#[allow(clippy::unused_unit)] // TODO: Remove this clippy override once error handling is added
pub async fn stream(&self, peer_id: PeerId) -> Result<UnicastStream, ()> {
// TODO: With this system you can send to any random peer id. Can I reduce that by requiring `.connect(peer_id).unwrap().send(data)` or something like that.

81
crates/p2p/src/proto.rs Normal file
View file

@ -0,0 +1,81 @@
//! Temporary library for easier binary encoding/decoding.
//!
//! Eventually these will be deprecated by macros but I can't find one which supports large payloads (basically it needs to write to async stream not in-memory bufffer) -> Binario is my own prototype of a Rust library to do this but it's not prod ready yet.
//!
use thiserror::Error;
use uuid::Uuid;
pub mod decode {
use crate::spacetunnel::IdentityErr;
use super::*;
use tokio::io::{AsyncRead, AsyncReadExt};
#[derive(Error, Debug)]
pub enum Error {
#[error("IoError({0})")]
IoError(#[from] std::io::Error),
#[error("UuidFormatError({0})")]
UuidFormatError(#[from] uuid::Error),
#[error("NameFormatError({0})")]
NameFormatError(#[from] std::string::FromUtf8Error),
#[error("InvalidRemoteIdentity({0})")]
InvalidRemoteIdentity(#[from] IdentityErr),
}
/// Deserialize uuid as it's fixed size data.
pub async fn uuid(stream: &mut (impl AsyncRead + Unpin)) -> Result<Uuid, Error> {
let mut buf = vec![0u8; 16];
stream.read_exact(&mut buf).await?;
Uuid::from_slice(&buf).map_err(Into::into)
}
/// Deserialize string as it's u16 length and data.
pub async fn string(stream: &mut (impl AsyncRead + Unpin)) -> Result<String, Error> {
let len = stream.read_u16_le().await?;
let mut buf = vec![0u8; len as usize];
stream.read_exact(&mut buf).await?;
String::from_utf8(buf).map_err(Into::into)
}
/// Deserialize buf as it's u16 length and data.
pub async fn buf(stream: &mut (impl AsyncRead + Unpin)) -> Result<Vec<u8>, Error> {
let len = stream.read_u16_le().await?;
let mut buf = vec![0u8; len as usize];
stream.read_exact(&mut buf).await?;
Ok(buf)
}
}
pub mod encode {
use super::*;
/// Serialize uuid as it's fixed size data.
pub fn uuid(buf: &mut Vec<u8>, uuid: &Uuid) {
buf.extend(uuid.as_bytes());
}
/// Serialize string as it's u16 length and data.
pub fn string(buf: &mut Vec<u8>, s: &str) {
let len_buf = (s.len() as u16).to_le_bytes();
if s.len() > u16::MAX as usize {
panic!("String is too long!"); // TODO: Error handling
}
buf.extend_from_slice(&len_buf);
buf.extend(s.as_bytes());
}
/// Serialize buf as it's u16 length and data.
pub fn buf(buf: &mut Vec<u8>, b: &[u8]) {
let len_buf = (b.len() as u16).to_le_bytes();
if b.len() > u16::MAX as usize {
panic!("Buf is too long!"); // TODO: Error handling
}
buf.extend_from_slice(&len_buf);
buf.extend(b);
}
}

View file

@ -0,0 +1,46 @@
use tokio::io::AsyncReadExt;
/// TODO
pub struct Block<'a> {
// TODO: File content, checksum, source location so it can be resent!
pub offset: u64,
pub size: u64,
pub data: &'a [u8],
// TODO: Checksum?
}
impl<'a> Block<'a> {
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&self.offset.to_le_bytes());
buf.extend_from_slice(&self.size.to_le_bytes());
buf.extend_from_slice(self.data);
buf
}
pub async fn from_stream(
stream: &mut (impl AsyncReadExt + Unpin),
data_buf: &mut [u8],
) -> Result<Block<'a>, ()> {
let mut offset = [0; 8];
stream.read_exact(&mut offset).await.map_err(|_| ())?; // TODO: Error handling
let offset = u64::from_le_bytes(offset);
let mut size = [0; 8];
stream.read_exact(&mut size).await.map_err(|_| ())?; // TODO: Error handling
let size = u64::from_le_bytes(size);
// TODO: Ensure `size` is `block_size` or smaller else buffer overflow
stream
.read_exact(&mut data_buf[..size as usize])
.await
.map_err(|_| ())?; // TODO: Error handling
Ok(Self {
offset,
size,
data: &[], // TODO: This is super cringe. Data should be decoded here but lifetimes and extra allocations become a major concern.
})
}
}

View file

@ -0,0 +1,22 @@
/// TODO
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockSize(u32); // Max block size is gonna be 3.9GB which is stupidly overkill
impl BlockSize {
// TODO: Validating `BlockSize` are multiple of 2, i think. Idk why but BEP does it.
pub fn from_size(size: u64) -> Self {
// TODO: Something like: https://docs.syncthing.net/specs/bep-v1.html#selection-of-block-size
Self(131072) // 128 KiB
}
/// This is super dangerous as it doesn't enforce any assumptions of the protocol and is designed just for tests.
#[cfg(test)]
pub fn dangerously_new(size: u32) -> Self {
Self(size)
}
pub fn size(&self) -> u32 {
self.0
}
}

View file

@ -16,144 +16,18 @@ use tokio::{
};
use tracing::debug;
use crate::spacetime::{SpaceTimeStream, UnicastStream};
use crate::{
proto::{decode, encode},
spacetime::UnicastStream,
};
/// TODO
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockSize(u32); // Max block size is gonna be 3.9GB which is stupidly overkill
mod block;
mod block_size;
mod sb_request;
impl BlockSize {
// TODO: Validating `BlockSize` are multiple of 2, i think. Idk why but BEP does it.
pub fn from_size(size: u64) -> Self {
// TODO: Something like: https://docs.syncthing.net/specs/bep-v1.html#selection-of-block-size
Self(131072) // 128 KiB
}
/// This is super dangerous as it doesn't enforce any assumptions of the protocol and is designed just for tests.
#[cfg(test)]
pub fn dangerously_new(size: u32) -> Self {
Self(size)
}
pub fn size(&self) -> u32 {
self.0
}
}
/// TODO
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SpaceblockRequest {
pub name: String,
pub size: u64,
// TODO: Include file permissions
pub block_size: BlockSize,
}
#[derive(Debug, Error)]
pub enum SpacedropRequestError {
#[error("io error reading name len: {0}")]
NameLenIoError(std::io::Error),
#[error("io error reading name: {0}")]
NameIoError(std::io::Error),
#[error("error utf-8 decoding name: {0}")]
NameFormatError(FromUtf8Error),
#[error("io error reading file size: {0}")]
SizeIoError(std::io::Error),
}
impl SpaceblockRequest {
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
) -> Result<Self, SpacedropRequestError> {
let name = {
let len = stream
.read_u16_le()
.await
.map_err(SpacedropRequestError::NameLenIoError)?;
let mut buf = vec![0u8; len as usize];
stream
.read_exact(&mut buf)
.await
.map_err(SpacedropRequestError::NameIoError)?;
String::from_utf8(buf).map_err(SpacedropRequestError::NameFormatError)?
};
let size = stream
.read_u64_le()
.await
.map_err(SpacedropRequestError::SizeIoError)?;
let block_size = BlockSize::from_size(size); // TODO: Get from stream: stream.read_u8().await.map_err(|_| ())?; // TODO: Error handling
Ok(Self {
name,
size,
block_size,
})
}
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::new();
let len_buf = (self.name.len() as u16).to_le_bytes();
if self.name.len() > u16::MAX as usize {
panic!("Name is too long!"); // TODO: Error handling
}
buf.extend_from_slice(&len_buf);
buf.extend(self.name.as_bytes());
buf.extend_from_slice(&self.size.to_le_bytes());
buf
}
}
/// TODO
pub struct Block<'a> {
// TODO: File content, checksum, source location so it can be resent!
pub offset: u64,
pub size: u64,
pub data: &'a [u8],
// TODO: Checksum?
}
impl<'a> Block<'a> {
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&self.offset.to_le_bytes());
buf.extend_from_slice(&self.size.to_le_bytes());
buf.extend_from_slice(self.data);
buf
}
pub async fn from_stream(
stream: &mut (impl AsyncReadExt + Unpin),
data_buf: &mut [u8],
) -> Result<Block<'a>, ()> {
let mut offset = [0; 8];
stream.read_exact(&mut offset).await.map_err(|_| ())?; // TODO: Error handling
let offset = u64::from_le_bytes(offset);
let mut size = [0; 8];
stream.read_exact(&mut size).await.map_err(|_| ())?; // TODO: Error handling
let size = u64::from_le_bytes(size);
// TODO: Ensure `size` is `block_size` or smaller else buffer overflow
stream
.read_exact(&mut data_buf[..size as usize])
.await
.map_err(|_| ())?; // TODO: Error handling
Ok(Self {
offset,
size,
data: &[], // TODO: This is super cringe. Data should be decoded here but lifetimes and extra allocations become a major concern.
})
}
}
pub use block::*;
pub use block_size::*;
pub use sb_request::*;
/// TODO
pub struct Transfer<'a, F> {

View file

@ -0,0 +1,59 @@
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt};
use crate::proto::{decode, encode};
use super::BlockSize;
/// TODO
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SpaceblockRequest {
pub name: String,
pub size: u64,
// TODO: Include file permissions
pub block_size: BlockSize,
}
#[derive(Debug, Error)]
pub enum SpacedropRequestError {
#[error("SpacedropRequestError::Name({0})")]
Name(decode::Error),
#[error("SpacedropRequestError::Size({0})")]
Size(std::io::Error),
}
impl SpaceblockRequest {
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
) -> Result<Self, SpacedropRequestError> {
let name = decode::string(stream)
.await
.map_err(SpacedropRequestError::Name)?;
let size = stream
.read_u64_le()
.await
.map_err(SpacedropRequestError::Size)?;
let block_size = BlockSize::from_size(size); // TODO: Get from stream: stream.read_u8().await.map_err(|_| ())?; // TODO: Error handling
Ok(Self {
name,
size,
block_size,
})
}
pub fn to_bytes(&self) -> Vec<u8> {
let Self {
name,
size,
block_size,
} = self;
let mut buf = Vec::new();
encode::string(&mut buf, name);
buf.extend_from_slice(&self.size.to_le_bytes());
buf
}
}

View file

@ -5,11 +5,16 @@ use std::{
};
use libp2p::{core::UpgradeInfo, swarm::NegotiatedSubstream, InboundUpgrade};
use tokio::io::AsyncReadExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::debug;
use crate::{Manager, ManagerStreamAction, Metadata, PeerId, PeerMessageEvent};
use crate::{
spacetime::{BroadcastStream, UnicastStream},
Manager, ManagerStreamAction, Metadata, PeerId, PeerMessageEvent,
};
use super::{SpaceTimeProtocolName, SpaceTimeStream};
use super::SpaceTimeProtocolName;
pub struct InboundProtocol<TMetadata: Metadata> {
pub(crate) peer_id: PeerId,
@ -38,23 +43,38 @@ impl<TMetadata: Metadata> InboundUpgrade<NegotiatedSubstream> for InboundProtoco
self.peer_id
);
let stream = SpaceTimeStream::from_stream(io).await;
debug!(
"stream({}, {id}): stream of type {} accepted",
self.peer_id,
stream.stream_type(),
);
Ok(ManagerStreamAction::Event(
PeerMessageEvent {
stream_id: id,
peer_id: self.peer_id,
manager: self.manager.clone(),
stream,
_priv: (),
let mut io = io.compat();
let discriminator = io.read_u8().await.unwrap(); // TODO: Timeout on this
match discriminator {
crate::spacetime::BROADCAST_DISCRIMINATOR => {
debug!("stream({}, {id}): broadcast stream accepted", self.peer_id);
Ok(ManagerStreamAction::Event(
PeerMessageEvent {
stream_id: id,
peer_id: self.peer_id,
manager: self.manager.clone(),
stream: BroadcastStream::new(io),
_priv: (),
}
.into(),
))
}
.into(),
))
crate::spacetime::UNICAST_DISCRIMINATOR => {
debug!("stream({}, {id}): unicast stream accepted", self.peer_id);
Ok(ManagerStreamAction::Event(
PeerMessageEvent {
stream_id: id,
peer_id: self.peer_id,
manager: self.manager.clone(),
stream: UnicastStream::new(io),
_priv: (),
}
.into(),
))
}
_ => todo!(), // TODO: Error handling
}
})
}
}

View file

@ -10,6 +10,7 @@ use libp2p::{
OutboundUpgrade,
};
use tokio::sync::oneshot;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::error;
use super::{SpaceTimeProtocolName, UnicastStream, BROADCAST_DISCRIMINATOR};
@ -62,7 +63,7 @@ impl OutboundUpgrade<NegotiatedSubstream> for OutboundProtocol {
}
OutboundRequest::Unicast(sender) => {
// We write the discriminator to the stream in the `Manager::stream` method before returning the stream to the user to make async a tad nicer.
sender.send(UnicastStream::new(io)).unwrap();
sender.send(UnicastStream::new(io.compat())).unwrap();
}
}

View file

@ -5,75 +5,22 @@ use std::{
};
use libp2p::{futures::AsyncWriteExt, swarm::NegotiatedSubstream};
use tokio::io::{
AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt as TokioAsyncWriteExt, ReadBuf,
};
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt};
use tracing::error;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt as TokioAsyncWriteExt, ReadBuf};
use tokio_util::compat::Compat;
pub const BROADCAST_DISCRIMINATOR: u8 = 0;
pub const UNICAST_DISCRIMINATOR: u8 = 1;
#[derive(Debug)]
pub enum SpaceTimeStream {
Broadcast(BroadcastStream),
Unicast(UnicastStream),
}
impl SpaceTimeStream {
pub(crate) async fn from_stream(io: NegotiatedSubstream) -> Self {
let mut io = io.compat();
let discriminator = io.read_u8().await.unwrap(); // TODO: Timeout on this
match discriminator {
BROADCAST_DISCRIMINATOR => Self::Broadcast(BroadcastStream(Some(io))),
UNICAST_DISCRIMINATOR => Self::Unicast(UnicastStream(io)),
_ => todo!(), // TODO: Error handling
}
}
pub fn stream_type(&self) -> &'static str {
match self {
Self::Broadcast(_) => "broadcast",
Self::Unicast(_) => "unicast",
}
}
pub async fn close(self) -> Result<(), io::Error> {
match self {
Self::Broadcast(mut stream) => {
if let Some(stream) = stream.0.take() {
BroadcastStream::close_inner(stream).await
} else if cfg!(debug_assertions) {
panic!("'BroadcastStream' should never be 'None' here!");
} else {
error!("'BroadcastStream' should never be 'None' here!");
Ok(())
}
}
Self::Unicast(stream) => stream.0.into_inner().close().await,
}
}
}
impl AsyncRead for SpaceTimeStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
match self.get_mut() {
Self::Broadcast(stream) => Pin::new(stream).poll_read(cx, buf),
Self::Unicast(stream) => Pin::new(stream).poll_read(cx, buf),
}
}
}
/// A broadcast is a message sent to many peers in the network.
/// Due to this it is not possible to respond to a broadcast.
#[derive(Debug)]
pub struct BroadcastStream(Option<Compat<NegotiatedSubstream>>);
impl BroadcastStream {
pub(crate) fn new(stream: Compat<NegotiatedSubstream>) -> Self {
Self(Some(stream))
}
async fn close_inner(mut io: Compat<NegotiatedSubstream>) -> Result<(), io::Error> {
io.write_all(&[b'D']).await?;
io.flush().await?;
@ -114,8 +61,8 @@ pub struct UnicastStream(Compat<NegotiatedSubstream>);
// TODO: Utils for sending msgpack and stuff over the stream. -> Have a max size of reading buffers so we are less susceptible to DoS attacks.
impl UnicastStream {
pub(crate) fn new(io: NegotiatedSubstream) -> Self {
Self(io.compat())
pub(crate) fn new(io: Compat<NegotiatedSubstream>) -> Self {
Self(io)
}
pub(crate) async fn write_discriminator(&mut self) -> io::Result<()> {

View file

@ -7,8 +7,15 @@ use thiserror::Error;
pub struct IdentityErr(#[from] ed25519_dalek::ed25519::Error);
/// TODO
#[derive(Debug)]
pub struct Identity(ed25519_dalek::Keypair);
impl PartialEq for Identity {
fn eq(&self, other: &Self) -> bool {
self.0.public.eq(&other.0.public)
}
}
impl Default for Identity {
fn default() -> Self {
Self(ed25519_dalek::Keypair::generate(&mut OsRng))

View file

@ -1,4 +1,4 @@
//! A system for creating encrypted tunnels between peers on untrusted connections.
//! A system for creating encrypted tunnels between peers over untrusted connections.
mod identity;
mod tunnel;

View file

@ -71,7 +71,7 @@ pub enum CRDTOperationType {
#[derive(Serialize, Deserialize, Clone, Type)]
pub struct CRDTOperation {
pub node: Uuid,
pub instance: Uuid,
#[specta(type = u32)]
pub timestamp: NTP64,
pub id: Uuid,
@ -82,7 +82,7 @@ pub struct CRDTOperation {
impl Debug for CRDTOperation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CRDTOperation")
.field("node", &self.node.to_string())
.field("instance", &self.instance.to_string())
.field("timestamp", &self.timestamp.to_string())
.field("typ", &self.typ)
.finish()

View file

@ -106,7 +106,7 @@ export function ErrorPage({
window.__TAURI_INVOKE__('reset_spacedrive');
}}
>
Reset Spacedrive & Quit App
Reset & Quit App
</Button>
</div>
)}

View file

@ -2,14 +2,14 @@ import clsx from 'clsx';
import { Repeat, Trash } from 'phosphor-react';
import { useState } from 'react';
import { useNavigate } from 'react-router';
import { Location, Node, arraysEqual, useLibraryMutation, useOnlineLocations } from '@sd/client';
import { Location, arraysEqual, useLibraryMutation, useOnlineLocations } from '@sd/client';
import { Button, Card, Tooltip, dialogManager } from '@sd/ui';
import { Folder } from '~/components';
import { useIsDark } from '~/hooks';
import DeleteDialog from './DeleteDialog';
interface Props {
location: Location & { node: Node | null };
location: Location;
}
export default ({ location }: Props) => {
@ -36,11 +36,12 @@ export default ({ location }: Props) => {
<div className="grid min-w-[110px] grid-cols-1">
<h1 className="truncate pt-0.5 text-sm font-semibold">{location.name}</h1>
<p className="mt-0.5 select-text truncate text-sm text-ink-dull">
{location.node && (
{/* // TODO: This is ephemeral so it should not come from the DB. Eg. a external USB can move between nodes */}
{/* {location.node && (
<span className="mr-1 rounded bg-app-selected px-1 py-[1px]">
{location.node.name}
</span>
)}
)} */}
{location.path}
</p>
</div>

View file

@ -1,5 +1,6 @@
import { useDiscoveredPeers, useFeatureFlag, useLibraryMutation } from '@sd/client';
import { useBridgeMutation, useDiscoveredPeers, useFeatureFlag } from '@sd/client';
import { Button } from '@sd/ui';
import { startPairing } from '~/app/p2p/pairing';
import { Heading } from '../Layout';
export const Component = () => {
@ -11,9 +12,9 @@ export const Component = () => {
title="Nodes"
description="Manage the nodes connected to this library. A node is an instance of Spacedrive's backend, running on a device or server. Each node carries a copy of the database and synchronizes via peer-to-peer connections in realtime."
/>
{/* TODO: Show paired nodes + unpair button */}
{/* TODO: Replace with modal */}
{isPairingEnabled && <IncorrectP2PPairingPane />}
</>
);
@ -22,14 +23,12 @@ export const Component = () => {
// TODO: This entire component shows a UI which is pairing by node but that is just not how it works.
function IncorrectP2PPairingPane() {
const onlineNodes = useDiscoveredPeers();
const p2pPair = useLibraryMutation('p2p.pair', {
const p2pPair = useBridgeMutation('p2p.pair', {
onSuccess(data) {
console.log(data);
}
});
console.log(onlineNodes);
return (
<>
<h1>Pairing</h1>
@ -37,7 +36,19 @@ function IncorrectP2PPairingPane() {
<div key={id} className="flex space-x-2">
<p>{node.name}</p>
<Button onClick={() => p2pPair.mutate(id)}>Pair</Button>
<Button
onClick={() => {
// TODO: This is not great
p2pPair.mutateAsync(id).then((id) =>
startPairing(id, {
name: node.name,
os: node.operating_system
})
);
}}
>
Pair
</Button>
</div>
))}
</>

View file

@ -83,86 +83,9 @@ function DropItem(props: DropItemProps) {
);
}
// const schema = z.object({
// target_peer: z.string(),
// file_path: z.string()
// });
// // TODO: This will be removed and properly hooked up to the UI in the future
// function TemporarySpacedropDemo() {
// const [[discoveredPeers], setDiscoveredPeer] = useState([new Map<string, PeerMetadata>()]);
// const doSpacedrop = useBridgeMutation('p2p.spacedrop');
// const form = useZodForm({
// schema
// });
// useBridgeSubscription(['p2p.events'], {
// onData(data) {
// if (data.type === 'DiscoveredPeer') {
// setDiscoveredPeer([discoveredPeers.set(data.peer_id, data.metadata)]);
// // if (!form.getValues().target_peer) form.setValue('target_peer', data.peer_id);
// }
// }
// });
// const onSubmit = form.handleSubmit((data) => {
// doSpacedrop.mutate({
// peer_id: data.target_peer,
// file_path: data.file_path
// });
// });
// // TODO: Input select
// return (
// <Form onSubmit={onSubmit} form={form}>
// <ScreenHeading>Spacedrop Demo</ScreenHeading>
// <p className="text-xs text-ink-dull">
// Note: Right now the file must be less than 255 bytes long and only contain UTF-8
// chars. Create a txt file in Vscode to test (note macOS TextEdit cause that is rtf by
// default)
// </p>
// <div className="mt-2 flex flex-row items-center space-x-4">
// <Input
// size="sm"
// placeholder="/Users/oscar/Desktop/sd/demo.txt"
// value="/Users/jamie/Desktop/Jeff.txt"
// className="w-full"
// {...form.register('file_path')}
// />
// <Button className="block shrink-0" variant="gray">
// Select File
// </Button>
// <Select
// onChange={(e) => form.setValue('target_peer', e)}
// value={form.watch('target_peer')}
// >
// {[...discoveredPeers.entries()].map(([peerId, metadata], index) => (
// <SelectOption default={index === 0} key={peerId} value={peerId}>
// {metadata.name}
// </SelectOption>
// ))}
// </Select>
// <Button
// disabled={!form.getValues().target_peer}
// className="block shrink-0"
// variant="accent"
// type="submit"
// >
// Send
// </Button>
// </div>
// </Form>
// );
// }
export const Component = () => {
return (
<>
{/* <TemporarySpacedropDemo /> */}
<div className={classes.honeycombOuter}>
<div className={clsx(classes.honeycombContainer, 'mt-8')}></div>
</div>

View file

@ -3,7 +3,9 @@ import {
useBridgeMutation,
useBridgeSubscription,
useDiscoveredPeers,
useFeatureFlag
useFeatureFlag,
useP2PEvents,
withFeatureFlag
} from '@sd/client';
import {
Dialog,
@ -16,37 +18,25 @@ import {
useZodForm,
z
} from '@sd/ui';
import { getSpacedropState, subscribeSpacedropState } from '../hooks/useSpacedropState';
import { getSpacedropState, subscribeSpacedropState } from '../../hooks/useSpacedropState';
export function SpacedropUI() {
const isSpacedropEnabled = useFeatureFlag('spacedrop');
if (!isSpacedropEnabled) {
return null;
}
return <SpacedropUIInner />;
}
function SpacedropUIInner() {
useEffect(() =>
subscribeSpacedropState(() => {
dialogManager.create((dp) => <SpacedropDialog {...dp} />);
})
);
// TODO: In a perfect world, this would not exist as it means we have two open subscriptions for the same data (the other one being in `useP2PEvents.tsx` in `@sd/client`). It's just hard so we will eat the overhead for now.
useBridgeSubscription(['p2p.events'], {
onData(data) {
if (data.type === 'SpacedropRequest') {
dialogManager.create((dp) => (
<SpacedropRequestDialog
dropId={data.id}
name={data.name}
peerId={data.peer_id}
{...dp}
/>
));
}
useP2PEvents((data) => {
if (data.type === 'SpacedropRequest') {
dialogManager.create((dp) => (
<SpacedropRequestDialog
dropId={data.id}
name={data.name}
peerId={data.peer_id}
{...dp}
/>
));
}
});

View file

@ -0,0 +1,23 @@
import { useOnFeatureFlagsChange, useP2PEvents, withFeatureFlag } from '@sd/client';
import { SpacedropUI } from './Spacedrop';
import { startPairing } from './pairing';
export const SpacedropUI2 = withFeatureFlag('spacedrop', SpacedropUI);
// Entrypoint of P2P UI
export function P2P() {
useP2PEvents((data) => {
if (data.type === 'PairingRequest') {
startPairing(data.id, {
name: data.name,
os: data.os
});
}
});
return (
<>
<SpacedropUI2 />
</>
);
}

View file

@ -0,0 +1,145 @@
import { useState } from 'react';
import { P, match } from 'ts-pattern';
import {
OperatingSystem,
useBridgeMutation,
useCachedLibraries,
usePairingStatus
} from '@sd/client';
import {
Button,
Dialog,
Loader,
Select,
SelectOption,
UseDialogProps,
dialogManager,
useDialog,
useZodForm,
z
} from '@sd/ui';
type Node = {
name: string;
os: OperatingSystem | null;
};
export function startPairing(pairing_id: number, node: Node) {
dialogManager.create((dp) => <OriginatorDialog pairingId={pairing_id} node={node} {...dp} />);
}
function OriginatorDialog({
pairingId,
node,
...props
}: { pairingId: number; node: Node } & UseDialogProps) {
const pairingStatus = usePairingStatus(pairingId);
// TODO: If dialog closes before finished, cancel pairing
return (
<Dialog
form={useZodForm({ schema: z.object({}) })}
dialog={useDialog(props)}
title={`Pairing with ${node.name}`}
loading={true}
submitDisabled={pairingStatus?.type !== 'PairingComplete'}
ctaLabel="Done"
// closeLabel="Cancel"
onSubmit={async () => {
alert('TODO');
// TODO: Change into the new library
}}
// onCancelled={() => acceptSpacedrop.mutate([props.dropId, null])}
>
<div className="space-y-2 py-2">
{match(pairingStatus)
.with({ type: 'EstablishingConnection' }, () => (
<PairingLoading msg="Establishing connection..." />
))
.with({ type: 'PairingRequested' }, () => (
<PairingLoading msg="Requesting to pair..." />
))
.with({ type: 'PairingDecisionRequest' }, () => (
<PairingResponder pairingId={pairingId} />
))
.with({ type: 'PairingInProgress', data: P.select() }, (data) => (
<PairingLoading msg={`Pairing into library ${data.library_name}`} />
))
.with({ type: 'InitialSyncProgress', data: P.select() }, (data) => (
<PairingLoading msg={`Syncing library data ${data}/100`} />
))
.with({ type: 'PairingComplete' }, () => <CompletePairing />)
.with({ type: 'PairingRejected' }, () => <PairingRejected />)
.with(undefined, () => <></>)
.exhaustive()}
</div>
</Dialog>
);
}
function PairingResponder({ pairingId }: { pairingId: number }) {
const libraries = useCachedLibraries();
const [selectedLibrary, setSelectedLibrary] = useState<string | undefined>(
libraries.data?.[0]?.uuid
);
const pairingResponse = useBridgeMutation('p2p.pairingResponse');
return (
<>
{selectedLibrary ? (
<Select onChange={(e) => setSelectedLibrary(e)} value={selectedLibrary}>
{libraries.data?.map((lib, index) => (
<SelectOption default={index === 0} key={lib.uuid} value={lib.uuid}>
{lib.config.name}
</SelectOption>
))}
</Select>
) : (
<p>No libraries. Uh oh!</p>
)}
<div className="align-center flex h-full w-full items-center justify-center space-x-2">
<Button
variant="accent"
onClick={() => {
if (selectedLibrary)
pairingResponse.mutate([
pairingId,
{ decision: 'accept', libraryId: selectedLibrary }
]);
}}
>
Accept
</Button>
<Button onClick={() => pairingResponse.mutate([pairingId, { decision: 'reject' }])}>
Reject
</Button>
</div>
</>
);
}
function PairingLoading({ msg }: { msg?: string }) {
return (
<div className="align-center flex h-full w-full flex-col items-center justify-center">
<Loader />
{msg && <p>{msg}</p>}
</div>
);
}
function CompletePairing() {
return (
<div className="flex h-full w-full justify-center">
<p>Pairing Complete!</p>
</div>
);
}
function PairingRejected() {
return (
<div className="flex h-full w-full justify-center">
<p>Pairing Rejected By Remote!</p>
</div>
);
}

View file

@ -10,7 +10,7 @@ import { ErrorBoundary } from 'react-error-boundary';
import { RouterProvider, RouterProviderProps } from 'react-router-dom';
import { P2PContextProvider, useDebugState } from '@sd/client';
import ErrorFallback from './ErrorFallback';
import { SpacedropUI } from './app/Spacedrop';
import { P2P } from './app/p2p';
export { ErrorPage } from './ErrorFallback';
export * from './app';
@ -49,8 +49,8 @@ export const SpacedriveInterface = (props: { router: RouterProviderProps['router
return (
<ErrorBoundary FallbackComponent={ErrorFallback}>
<P2PContextProvider>
<P2P />
<Devtools />
<SpacedropUI />
<RouterProvider router={props.router} />
</P2PContextProvider>
</ErrorBoundary>

View file

@ -16,7 +16,7 @@ export type Procedures = {
{ key: "locations.indexer_rules.get", input: LibraryArgs<number>, result: IndexerRule } |
{ key: "locations.indexer_rules.list", input: LibraryArgs<null>, result: IndexerRule[] } |
{ key: "locations.indexer_rules.listForLocation", input: LibraryArgs<number>, result: IndexerRule[] } |
{ key: "locations.list", input: LibraryArgs<null>, result: { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; node_id: number | null; node: Node | null }[] } |
{ key: "locations.list", input: LibraryArgs<null>, result: Location[] } |
{ key: "nodeState", input: never, result: NodeState } |
{ key: "nodes.listLocations", input: LibraryArgs<string | null>, result: ExplorerItem[] } |
{ key: "preferences.get", input: LibraryArgs<null>, result: LibraryPreferences } |
@ -60,7 +60,8 @@ export type Procedures = {
{ key: "locations.update", input: LibraryArgs<LocationUpdateArgs>, result: null } |
{ key: "nodes.edit", input: ChangeNodeNameArgs, result: null } |
{ key: "p2p.acceptSpacedrop", input: [string, string | null], result: null } |
{ key: "p2p.pair", input: LibraryArgs<PeerId>, result: number } |
{ key: "p2p.pair", input: PeerId, result: number } |
{ key: "p2p.pairingResponse", input: [number, PairingDecision], result: null } |
{ key: "p2p.spacedrop", input: SpacedropArgs, result: string | null } |
{ key: "preferences.update", input: LibraryArgs<LibraryPreferences>, result: null } |
{ key: "tags.assign", input: LibraryArgs<TagAssignArgs>, result: null } |
@ -80,7 +81,7 @@ export type Procedures = {
export type BuildInfo = { version: string; commit: string }
export type CRDTOperation = { node: string; timestamp: number; id: string; typ: CRDTOperationType }
export type CRDTOperation = { instance: string; timestamp: number; id: string; typ: CRDTOperationType }
export type CRDTOperationType = SharedOperation | RelationOperation
@ -158,7 +159,12 @@ export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Faile
*/
export type LibraryArgs<T> = { library_id: string; arg: T }
export type LibraryConfigWrapped = { uuid: string; config: SanitisedLibraryConfig }
/**
* LibraryConfig holds the configuration for a specific library. This is stored as a '{uuid}.sdlibrary' file.
*/
export type LibraryConfig = { name: LibraryName; description: string | null; instance_id: number }
export type LibraryConfigWrapped = { uuid: string; config: LibraryConfig }
export type LibraryName = string
@ -170,7 +176,7 @@ export type ListViewColumnSettings = { hide: boolean; size: number | null }
export type ListViewSettings = { columns: { [key: string]: ListViewColumnSettings }; sort_col: string | null }
export type Location = { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; node_id: number | null }
export type Location = { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; instance_id: number | null }
/**
* `LocationCreateArgs` is the argument received from the client using `rspc` to create a new location.
@ -193,7 +199,7 @@ export type LocationUpdateArgs = { id: number; name: string | null; generate_pre
export type LocationViewSettings = { layout: ExplorerLayout; list: ListViewSettings }
export type LocationWithIndexerRules = { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; node_id: number | null; indexer_rules: { indexer_rule: IndexerRule }[] }
export type LocationWithIndexerRules = { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; instance_id: number | null; indexer_rules: { indexer_rule: IndexerRule }[] }
export type MaybeNot<T> = T | { not: T }
@ -201,8 +207,6 @@ export type MaybeUndefined<T> = null | null | T
export type MediaData = { id: number; pixel_width: number | null; pixel_height: number | null; longitude: number | null; latitude: number | null; fps: number | null; capture_device_make: string | null; capture_device_model: string | null; capture_device_software: string | null; duration_seconds: number | null; codecs: string | null; streams: number | null }
export type Node = { id: number; pub_id: number[]; name: string; platform: number; date_created: string; identity: number[] | null; node_peer_id: string | null }
export type NodeState = ({ id: string; name: string; p2p_port: number | null; p2p_email: string | null; p2p_img_url: string | null }) & { data_path: string }
export type Object = { id: number; pub_id: number[]; kind: number | null; key_id: number | null; hidden: boolean | null; favorite: boolean | null; important: boolean | null; note: string | null; date_created: string | null; date_accessed: string | null }
@ -230,11 +234,15 @@ export type OptionalRange<T> = { from: T | null; to: T | null }
/**
* TODO: P2P event for the frontend
*/
export type P2PEvent = { type: "DiscoveredPeer"; peer_id: PeerId; metadata: PeerMetadata } | { type: "SpacedropRequest"; id: string; peer_id: PeerId; name: string }
export type P2PEvent = { type: "DiscoveredPeer"; peer_id: PeerId; metadata: PeerMetadata } | { type: "SpacedropRequest"; id: string; peer_id: PeerId; name: string } | { type: "PairingRequest"; id: number; name: string; os: OperatingSystem } | { type: "PairingProgress"; id: number; status: PairingStatus }
export type PairingDecision = { decision: "accept"; libraryId: string } | { decision: "reject" }
export type PairingStatus = { type: "EstablishingConnection" } | { type: "PairingRequested" } | { type: "PairingDecisionRequest" } | { type: "PairingInProgress"; data: { library_name: string; library_description: string | null } } | { type: "InitialSyncProgress"; data: number } | { type: "PairingComplete"; data: string } | { type: "PairingRejected" }
export type PeerId = string
export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; version: string | null; email: string | null; img_url: string | null }
export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; version: string | null; email: string | null; img_url: string | null; instances: string[] }
export type RelationOperation = { relation_item: string; relation_group: string; relation: string; data: RelationOperationData }
@ -250,8 +258,6 @@ export type RenameOne = { from_file_path_id: number; to: string }
export type RuleKind = "AcceptFilesByGlob" | "RejectFilesByGlob" | "AcceptIfChildrenDirectoriesArePresent" | "RejectIfChildrenDirectoriesArePresent"
export type SanitisedLibraryConfig = { name: LibraryName; description: string | null; node_id: string }
export type SanitisedNodeConfig = { id: string; name: string; p2p_port: number | null; p2p_email: string | null; p2p_img_url: string | null }
export type SearchData<T> = { cursor: number[] | null; items: T[] }

View file

@ -1,4 +1,5 @@
import { useSnapshot } from 'valtio';
import { useEffect } from 'react';
import { subscribe, useSnapshot } from 'valtio';
import { valtioPersist } from '../lib/valito';
export const features = ['spacedrop', 'p2pPairing'] as const;
@ -18,6 +19,10 @@ export function useFeatureFlag(flag: FeatureFlag | FeatureFlag[]) {
return Array.isArray(flag) ? flag.every((f) => isEnabled(f)) : isEnabled(flag);
}
export function useOnFeatureFlagsChange(callback: (flags: FeatureFlag[]) => void) {
useEffect(() => subscribe(featureFlagState, () => callback(featureFlagState.enabled)));
}
export const isEnabled = (flag: FeatureFlag) => featureFlagState.enabled.find((ff) => flag === ff);
export function toggleFeatureFlag(flags: FeatureFlag | FeatureFlag[]) {
@ -26,9 +31,29 @@ export function toggleFeatureFlag(flags: FeatureFlag | FeatureFlag[]) {
}
flags.forEach((f) => {
if (!featureFlagState.enabled.find((ff) => f === ff)) {
if (f === 'p2pPairing') {
alert(
'Pairing will render your database broken and it WILL need to be reset! Use at your own risk!'
);
}
featureFlagState.enabled.push(f);
} else {
featureFlagState.enabled = featureFlagState.enabled.filter((ff) => f !== ff);
}
});
}
// Render component only when feature flag is enabled
export function withFeatureFlag(
flag: FeatureFlag | FeatureFlag[],
Component: React.FunctionComponent,
fallback: React.ReactNode = null
): React.FunctionComponent {
// @ts-expect-error
return (props) => {
const enabled = useFeatureFlag(flag);
// eslint-disable-next-line react-hooks/rules-of-hooks
return enabled ? <Component /> : fallback;
};
}

View file

@ -1,23 +1,70 @@
import { PropsWithChildren, createContext, useContext, useState } from 'react';
import { PeerMetadata } from '../core';
import {
MutableRefObject,
PropsWithChildren,
createContext,
useContext,
useEffect,
useRef,
useState
} from 'react';
import { P2PEvent, PairingStatus, PeerMetadata } from '../core';
import { useBridgeSubscription } from '../rspc';
const Context = createContext<Map<string, PeerMetadata>>(null as any);
type Context = {
discoveredPeers: Map<string, PeerMetadata>;
pairingStatus: Map<number, PairingStatus>;
events: MutableRefObject<EventTarget>;
};
const Context = createContext<Context>(null as any);
export function P2PContextProvider({ children }: PropsWithChildren) {
const events = useRef(new EventTarget());
const [[discoveredPeers], setDiscoveredPeer] = useState([new Map<string, PeerMetadata>()]);
const [[pairingStatus], setPairingStatus] = useState([new Map<number, PairingStatus>()]);
useBridgeSubscription(['p2p.events'], {
onData(data) {
events.current.dispatchEvent(new CustomEvent<P2PEvent>('p2p-event', { detail: data }));
if (data.type === 'DiscoveredPeer') {
setDiscoveredPeer([discoveredPeers.set(data.peer_id, data.metadata)]);
} else if (data.type === 'PairingProgress') {
setPairingStatus([pairingStatus.set(data.id, data.status)]);
}
}
});
return <Context.Provider value={discoveredPeers}>{children}</Context.Provider>;
return (
<Context.Provider
value={{
discoveredPeers,
pairingStatus,
events
}}
>
{children}
</Context.Provider>
);
}
export function useDiscoveredPeers() {
return useContext(Context);
return useContext(Context).discoveredPeers;
}
export function usePairingStatus(pairing_id: number) {
return useContext(Context).pairingStatus.get(pairing_id);
}
export function useP2PEvents(fn: (event: P2PEvent) => void) {
const ctx = useContext(Context);
useEffect(() => {
const handler = (e: Event) => {
fn((e as any).detail);
};
ctx.events.current.addEventListener('p2p-event', handler);
return () => ctx.events.current.removeEventListener('p2p-event', handler);
});
}

View file

@ -43,6 +43,7 @@
"react-loading-icons": "^1.1.0",
"react-router-dom": "6.9.0",
"tailwindcss-radix": "^2.6.0",
"ts-pattern": "^5.0.1",
"use-debounce": "^9.0.4",
"zod": "^3.21.4"
},

View file

@ -968,6 +968,9 @@ importers:
tailwindcss-radix:
specifier: ^2.6.0
version: 2.6.0
ts-pattern:
specifier: ^5.0.1
version: 5.0.1
use-debounce:
specifier: ^9.0.4
version: 9.0.4(react@18.2.0)
@ -23003,6 +23006,10 @@ packages:
resolution: {integrity: sha512-pefrkcd4lmIVR0LA49Imjf9DYLK8vtWhqBPA3Ya1ir8xCW0O2yjL9dsCVvI7pCodLC5q7smNpEtDR2yVulQxOg==}
dev: false
/ts-pattern@5.0.1:
resolution: {integrity: sha512-ZyNm28Lsg34Co5DS3e9DVyjlX2Y+2exkI4jqTKyU+9/OL6Y2fKOOuL8i+7no71o74C6mVS+UFoP3ekM3iCT1HQ==}
dev: false
/tsconfck@2.1.1(typescript@5.0.4):
resolution: {integrity: sha512-ZPCkJBKASZBmBUNqGHmRhdhM8pJYDdOXp4nRgj/O0JwUwsMq50lCDRQP/M5GBNAA0elPrq4gAeu4dkaVCuKWww==}
engines: {node: ^14.13.1 || ^16 || >=18}