diff --git a/Cargo.lock b/Cargo.lock index d00e4a601..2e8e3e341 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7102,6 +7102,7 @@ dependencies = [ "rmp-serde", "rmpv", "rspc", + "sd-core-sync", "sd-crypto", "sd-ffmpeg", "sd-file-ext", @@ -7109,6 +7110,7 @@ dependencies = [ "sd-p2p", "sd-prisma", "sd-sync", + "sd-utils", "serde", "serde-hashkey", "serde_json", @@ -7126,12 +7128,26 @@ dependencies = [ "tracing-appender", "tracing-subscriber 0.3.0", "tracing-test", - "uhlc", "uuid", "webp", "winapi-util", ] +[[package]] +name = "sd-core-sync" +version = "0.1.0" +dependencies = [ + "prisma-client-rust", + "sd-prisma", + "sd-sync", + "sd-utils", + "serde", + "serde_json", + "tokio", + "uhlc", + "uuid", +] + [[package]] name = "sd-crypto" version = "0.0.0" @@ -7328,6 +7344,13 @@ dependencies = [ "thiserror", ] +[[package]] +name = "sd-utils" +version = "0.1.0" +dependencies = [ + "uuid", +] + [[package]] name = "sdp" version = "0.5.3" diff --git a/Cargo.toml b/Cargo.toml index 0081489f5..83680a706 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ resolver = "2" members = [ "core", + "core/crates/*", "crates/*", # "crates/p2p/tunnel", # "crates/p2p/tunnel/utils", @@ -42,6 +43,9 @@ tauri-specta = { version = "1.0.2" } swift-rs = { version = "1.0.5" } tokio = { version = "1.28.2" } +uuid = { version = "1.3.3", features = ["v4", "serde"] } +serde = { version = "1.0" } +serde_json = { version = "1.0" } [patch.crates-io] # We use this patch so we can compile for the IOS simulator on M1 diff --git a/apps/mobile/src/screens/onboarding/CreatingLibrary.tsx b/apps/mobile/src/screens/onboarding/CreatingLibrary.tsx index c3b8d6473..5b9706086 100644 --- a/apps/mobile/src/screens/onboarding/CreatingLibrary.tsx +++ b/apps/mobile/src/screens/onboarding/CreatingLibrary.tsx @@ -50,8 +50,6 @@ const CreatingLibraryScreen = ({ navigation }: OnboardingStackScreenProps<'Creat const create = async () => { telemetryStore.shareTelemetry = obStore.shareTelemetry; createLibrary.mutate({ name: obStore.newLibraryName }); - - return; }; useEffect(() => { diff --git a/core/Cargo.toml b/core/Cargo.toml index e39d8618a..b6a40d3e6 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -15,7 +15,6 @@ mobile = [] # This feature controls whether the Spacedrive Core contains functionality which requires FFmpeg. ffmpeg = ["dep:sd-ffmpeg"] location-watcher = ["dep:notify"] -sync-messages = [] heif = ["dep:sd-heif"] [dependencies] @@ -31,6 +30,9 @@ sd-file-ext = { path = "../crates/file-ext" } sd-sync = { path = "../crates/sync" } sd-p2p = { path = "../crates/p2p", features = ["specta", "serde"] } sd-prisma = { path = "../crates/prisma" } +sd-utils = { path = "../crates/utils" } + +sd-core-sync = { path = "./crates/sync" } rspc = { workspace = true, features = [ "uuid", @@ -54,14 +56,14 @@ tokio = { workspace = true, features = [ base64 = "0.21.2" serde = { version = "1.0", features = ["derive"] } chrono = { version = "0.4.25", features = ["serde"] } -serde_json = "1.0" +serde_json = { workspace = true } futures = "0.3" rmp = "^0.8.11" rmp-serde = "^1.1.1" rmpv = "^1.0.0" blake3 = "1.3.3" hostname = "0.3.1" -uuid = { version = "1.3.3", features = ["v4", "serde"] } +uuid = { workspace = true } sysinfo = "0.28.4" thiserror = "1.0.40" include_dir = { version = "0.7.3", features = ["glob"] } @@ -78,7 +80,6 @@ ctor = "0.1.26" globset = { version = "^0.4.10", features = ["serde1"] } itertools = "^0.10.5" enumflags2 = "0.7.7" -uhlc = "0.5.2" http-range = "0.1.5" mini-moka = "0.10.0" serde_with = "2.3.3" diff --git a/core/crates/sync/Cargo.toml b/core/crates/sync/Cargo.toml new file mode 100644 index 000000000..217a37306 --- /dev/null +++ b/core/crates/sync/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "sd-core-sync" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = [] +emit-messages = [] + +[dependencies] +sd-prisma = { path = "../../../crates/prisma" } +sd-sync = { path = "../../../crates/sync" } +sd-utils = { path = "../../../crates/utils" } + +prisma-client-rust = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +uuid = { workspace = true } +uhlc = "0.5.2" diff --git a/core/crates/sync/src/lib.rs b/core/crates/sync/src/lib.rs new file mode 100644 index 000000000..ab72cb099 --- /dev/null +++ b/core/crates/sync/src/lib.rs @@ -0,0 +1,357 @@ +#![allow(clippy::unwrap_used, clippy::panic)] // TODO: Brendan remove this once you've got error handling here + +use sd_prisma::{prisma::*, prisma_sync::ModelSyncData}; +use sd_sync::*; +use sd_utils::uuid_to_bytes; + +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, +}; + +use serde_json::to_vec; +use tokio::sync::broadcast::{self, Receiver, Sender}; +use uhlc::{HLCBuilder, Timestamp, HLC, NTP64}; +use uuid::Uuid; + +pub use sd_prisma::prisma_sync; + +#[derive(Clone)] +pub enum SyncMessage { + Ingested(CRDTOperation), + Created(CRDTOperation), +} + +pub struct SyncManager { + db: Arc, + instance: Uuid, + _clocks: HashMap, + clock: HLC, + pub tx: Sender, +} + +impl SyncManager { + pub fn new(db: &Arc, instance: Uuid) -> (Self, Receiver) { + let (tx, rx) = broadcast::channel(64); + + ( + Self { + db: db.clone(), + instance, + clock: HLCBuilder::new().with_id(instance.into()).build(), + _clocks: Default::default(), + tx, + }, + rx, + ) + } + + pub async fn write_ops<'item, I: prisma_client_rust::BatchItem<'item>>( + &self, + tx: &PrismaClient, + (_ops, queries): (Vec, I), + ) -> prisma_client_rust::Result<::ReturnValue> { + #[cfg(feature = "emit-messages")] + let res = { + macro_rules! variant { + ($var:ident, $variant:ident, $fn:ident) => { + let $var = _ops + .iter() + .filter_map(|op| match &op.typ { + CRDTOperationType::$variant(inner) => { + Some($fn(&op, &inner).to_query(tx)) + } + _ => None, + }) + .collect::>(); + }; + } + + variant!(shared, Shared, shared_op_db); + variant!(relation, Relation, relation_op_db); + + let (res, _) = tx._batch((queries, (shared, relation))).await?; + + for op in _ops { + self.tx.send(SyncMessage::Created(op)).ok(); + } + + res + }; + #[cfg(not(feature = "emit-messages"))] + let res = tx._batch([queries]).await?.remove(0); + + Ok(res) + } + + #[allow(unused_variables)] + pub async fn write_op<'item, Q: prisma_client_rust::BatchItem<'item>>( + &self, + tx: &PrismaClient, + op: CRDTOperation, + query: Q, + ) -> prisma_client_rust::Result<::ReturnValue> { + #[cfg(feature = "emit-messages")] + let ret = { + macro_rules! exec { + ($fn:ident, $inner:ident) => { + tx._batch(($fn(&op, $inner).to_query(tx), query)).await?.1 + }; + } + + let ret = match &op.typ { + CRDTOperationType::Shared(inner) => exec!(shared_op_db, inner), + CRDTOperationType::Relation(inner) => exec!(relation_op_db, inner), + }; + + self.tx.send(SyncMessage::Created(op)).ok(); + + ret + }; + #[cfg(not(feature = "emit-messages"))] + let ret = tx._batch(vec![query]).await?.remove(0); + + Ok(ret) + } + + pub async fn get_ops(&self) -> prisma_client_rust::Result> { + let Self { db, .. } = self; + + shared_operation::include!(shared_include { + instance: select { pub_id } + }); + relation_operation::include!(relation_include { + instance: select { pub_id } + }); + + enum DbOperation { + Shared(shared_include::Data), + Relation(relation_include::Data), + } + + impl DbOperation { + fn timestamp(&self) -> NTP64 { + NTP64(match self { + Self::Shared(op) => op.timestamp, + Self::Relation(op) => op.timestamp, + } as u64) + } + + fn id(&self) -> Uuid { + Uuid::from_slice(match self { + Self::Shared(op) => &op.id, + Self::Relation(op) => &op.id, + }) + .unwrap() + } + + fn instance(&self) -> Uuid { + Uuid::from_slice(match self { + Self::Shared(op) => &op.instance.pub_id, + Self::Relation(op) => &op.instance.pub_id, + }) + .unwrap() + } + + fn into_operation(self) -> CRDTOperation { + CRDTOperation { + id: self.id(), + instance: self.instance(), + timestamp: self.timestamp(), + typ: match self { + Self::Shared(op) => CRDTOperationType::Shared(SharedOperation { + record_id: serde_json::from_slice(&op.record_id).unwrap(), + model: op.model, + data: serde_json::from_slice(&op.data).unwrap(), + }), + Self::Relation(op) => CRDTOperationType::Relation(RelationOperation { + relation: op.relation, + data: serde_json::from_slice(&op.data).unwrap(), + relation_item: serde_json::from_slice(&op.item_id).unwrap(), + relation_group: serde_json::from_slice(&op.group_id).unwrap(), + }), + }, + } + } + } + + let (shared, relation) = db + ._batch(( + db.shared_operation() + .find_many(vec![]) + .include(shared_include::include()), + db.relation_operation() + .find_many(vec![]) + .include(relation_include::include()), + )) + .await?; + + let mut ops = BTreeMap::new(); + + ops.extend( + shared + .into_iter() + .map(DbOperation::Shared) + .map(|op| (op.timestamp(), op)), + ); + ops.extend( + relation + .into_iter() + .map(DbOperation::Relation) + .map(|op| (op.timestamp(), op)), + ); + + Ok(ops + .into_values() + .map(DbOperation::into_operation) + .rev() + .collect()) + } + + pub async fn apply_op(&self, op: CRDTOperation) -> prisma_client_rust::Result<()> { + ModelSyncData::from_op(op.typ.clone()) + .unwrap() + .exec(&self.db) + .await?; + + match &op.typ { + CRDTOperationType::Shared(shared_op) => { + shared_op_db(&op, shared_op) + .to_query(&self.db) + .exec() + .await?; + } + CRDTOperationType::Relation(relation_op) => { + relation_op_db(&op, relation_op) + .to_query(&self.db) + .exec() + .await?; + } + } + + self.tx.send(SyncMessage::Ingested(op.clone())).ok(); + + Ok(()) + } + + async fn compare_message(&self, op: &CRDTOperation) -> bool { + let old_timestamp = match &op.typ { + CRDTOperationType::Shared(shared_op) => { + let newer_op = self + .db + .shared_operation() + .find_first(vec![ + shared_operation::timestamp::gte(op.timestamp.as_u64() as i64), + shared_operation::model::equals(shared_op.model.to_string()), + shared_operation::record_id::equals( + serde_json::to_vec(&shared_op.record_id).unwrap(), + ), + shared_operation::kind::equals(shared_op.kind().to_string()), + ]) + .order_by(shared_operation::timestamp::order(SortOrder::Desc)) + .exec() + .await + .unwrap(); + + newer_op.map(|newer_op| newer_op.timestamp) + } + CRDTOperationType::Relation(relation_op) => { + let newer_op = self + .db + .relation_operation() + .find_first(vec![ + relation_operation::timestamp::gte(op.timestamp.as_u64() as i64), + relation_operation::relation::equals(relation_op.relation.to_string()), + relation_operation::item_id::equals( + serde_json::to_vec(&relation_op.relation_item).unwrap(), + ), + relation_operation::kind::equals(relation_op.kind().to_string()), + ]) + .order_by(relation_operation::timestamp::order(SortOrder::Desc)) + .exec() + .await + .unwrap(); + + newer_op.map(|newer_op| newer_op.timestamp) + } + }; + + old_timestamp + .map(|old| old != op.timestamp.as_u64() as i64) + .unwrap_or_default() + } + + pub async fn receive_crdt_operation(&mut self, op: CRDTOperation) { + self.clock + .update_with_timestamp(&Timestamp::new(op.timestamp, op.instance.into())) + .ok(); + + let timestamp = self + ._clocks + .entry(op.instance) + .or_insert_with(|| op.timestamp); + + if *timestamp < op.timestamp { + *timestamp = op.timestamp; + } + + let op_timestamp = op.timestamp; + let op_instance = op.instance; + + let is_old = self.compare_message(&op).await; + + if !is_old { + self.apply_op(op).await.ok(); + } + + self.db + .instance() + .update( + instance::pub_id::equals(uuid_to_bytes(op_instance)), + vec![instance::timestamp::set(Some(op_timestamp.as_u64() as i64))], + ) + .exec() + .await + .ok(); + } +} + +fn shared_op_db(op: &CRDTOperation, shared_op: &SharedOperation) -> shared_operation::Create { + shared_operation::Create { + id: op.id.as_bytes().to_vec(), + timestamp: op.timestamp.0 as i64, + instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()), + kind: shared_op.kind().to_string(), + data: to_vec(&shared_op.data).unwrap(), + model: shared_op.model.to_string(), + record_id: to_vec(&shared_op.record_id).unwrap(), + _params: vec![], + } +} + +fn relation_op_db( + op: &CRDTOperation, + relation_op: &RelationOperation, +) -> relation_operation::Create { + relation_operation::Create { + id: op.id.as_bytes().to_vec(), + timestamp: op.timestamp.0 as i64, + instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()), + kind: relation_op.kind().to_string(), + data: to_vec(&relation_op.data).unwrap(), + relation: relation_op.relation.to_string(), + item_id: to_vec(&relation_op.relation_item).unwrap(), + group_id: to_vec(&relation_op.relation_group).unwrap(), + _params: vec![], + } +} + +impl OperationFactory for SyncManager { + fn get_clock(&self) -> &HLC { + &self.clock + } + + fn get_instance(&self) -> Uuid { + self.instance + } +} diff --git a/core/prisma/migrations/20230724131659_relation_operation/migration.sql b/core/prisma/migrations/20230724131659_relation_operation/migration.sql new file mode 100644 index 000000000..4acfb50ff --- /dev/null +++ b/core/prisma/migrations/20230724131659_relation_operation/migration.sql @@ -0,0 +1,15 @@ +-- AlterTable +ALTER TABLE "instance" ADD COLUMN "timestamp" BIGINT; + +-- CreateTable +CREATE TABLE "relation_operation" ( + "id" BLOB NOT NULL PRIMARY KEY, + "timestamp" BIGINT NOT NULL, + "relation" TEXT NOT NULL, + "item_id" BLOB NOT NULL, + "group_id" BLOB NOT NULL, + "kind" TEXT NOT NULL, + "data" BLOB NOT NULL, + "instance_id" INTEGER NOT NULL, + CONSTRAINT "relation_operation_instance_id_fkey" FOREIGN KEY ("instance_id") REFERENCES "instance" ("id") ON DELETE RESTRICT ON UPDATE CASCADE +); diff --git a/core/prisma/schema.prisma b/core/prisma/schema.prisma index afcdaea14..0b9ca1414 100644 --- a/core/prisma/schema.prisma +++ b/core/prisma/schema.prisma @@ -36,6 +36,23 @@ model SharedOperation { @@map("shared_operation") } +model RelationOperation { + id Bytes @id + timestamp BigInt + relation String + + item_id Bytes + group_id Bytes + + kind String + data Bytes + + instance_id Int + instance Instance @relation(fields: [instance_id], references: [id]) + + @@map("relation_operation") +} + /// @deprecated: This model has to exist solely for backwards compatibility. model Node { id Int @id @default(autoincrement()) @@ -67,9 +84,13 @@ model Instance { last_seen DateTime // Time core started for owner, last P2P message for P2P node date_created DateTime + // clock timestamp for sync + timestamp BigInt? + // attestation Bytes SharedOperation SharedOperation[] + RelationOperation RelationOperation[] @@map("instance") } diff --git a/core/src/api/libraries.rs b/core/src/api/libraries.rs index d293240a9..fc1bbc681 100644 --- a/core/src/api/libraries.rs +++ b/core/src/api/libraries.rs @@ -94,6 +94,8 @@ pub(crate) fn mount() -> AlphaRouter { .create(args.name, None, ctx.config.get().await) .await?; + debug!("Created library {}", new_library.uuid); + Ok(new_library) }) }) diff --git a/core/src/api/p2p.rs b/core/src/api/p2p.rs index aac6dba69..f40021187 100644 --- a/core/src/api/p2p.rs +++ b/core/src/api/p2p.rs @@ -80,7 +80,7 @@ pub(crate) fn mount() -> AlphaRouter { ctx.p2p .pairing .clone() - .originator(id, ctx.config.get().await) + .originator(id, ctx.config.get().await, ctx.library_manager.clone()) .await }) }) diff --git a/core/src/api/search.rs b/core/src/api/search.rs index 17f2914ee..c0f50cdc2 100644 --- a/core/src/api/search.rs +++ b/core/src/api/search.rs @@ -10,7 +10,6 @@ use crate::{ }, object::preview::get_thumb_key, prisma::{self, file_path, location, object, tag, tag_on_object, PrismaClient}, - util::db::chain_optional_iter, }; use std::collections::BTreeSet; @@ -159,7 +158,7 @@ impl FilePathFilterArgs { { use file_path::*; - Ok(chain_optional_iter( + Ok(sd_utils::chain_optional_iter( self.search .unwrap_or_default() .split(' ') @@ -248,7 +247,7 @@ impl ObjectFilterArgs { fn into_params(self) -> Vec { use object::*; - chain_optional_iter( + sd_utils::chain_optional_iter( [], [ self.hidden.to_param(), diff --git a/core/src/api/sync.rs b/core/src/api/sync.rs index ccf63667b..1ea144fca 100644 --- a/core/src/api/sync.rs +++ b/core/src/api/sync.rs @@ -1,6 +1,5 @@ use rspc::alpha::AlphaRouter; - -use crate::sync::SyncMessage; +use sd_core_sync::SyncMessage; use super::{utils::library, Ctx, R}; diff --git a/core/src/api/tags.rs b/core/src/api/tags.rs index a41427c30..dc10c1cf7 100644 --- a/core/src/api/tags.rs +++ b/core/src/api/tags.rs @@ -1,5 +1,7 @@ use chrono::Utc; use rspc::{alpha::AlphaRouter, ErrorCode}; +use sd_prisma::prisma_sync; +use sd_sync::OperationFactory; use serde::Deserialize; use specta::Type; @@ -9,8 +11,7 @@ use crate::{ invalidate_query, library::Library, object::tag::TagCreateArgs, - prisma::{tag, tag_on_object}, - sync::{self, OperationFactory}, + prisma::{object, tag, tag_on_object}, }; use super::{utils::library, Ctx, R}; @@ -66,29 +67,72 @@ pub(crate) fn mount() -> AlphaRouter { R.with2(library()) .mutation(|(_, library), args: TagAssignArgs| async move { - let Library { db, .. } = library.as_ref(); + let Library { db, sync, .. } = library.as_ref(); + + let (tag, objects) = db + ._batch(( + db.tag() + .find_unique(tag::id::equals(args.tag_id)) + .select(tag::select!({ pub_id })), + db.object() + .find_many(vec![object::id::in_vec(args.object_ids)]) + .select(object::select!({ id pub_id })), + )) + .await?; + + let tag = tag.ok_or_else(|| { + rspc::Error::new(ErrorCode::NotFound, "Tag not found".to_string()) + })?; + + macro_rules! sync_id { + ($object:ident) => { + prisma_sync::tag_on_object::SyncId { + tag: prisma_sync::tag::SyncId { + pub_id: tag.pub_id.clone(), + }, + object: prisma_sync::object::SyncId { + pub_id: $object.pub_id.clone(), + }, + } + }; + } if args.unassign { - db.tag_on_object() - .delete_many(vec![ - tag_on_object::tag_id::equals(args.tag_id), - tag_on_object::object_id::in_vec(args.object_ids), - ]) - .exec() - .await?; - } else { - db.tag_on_object() - .create_many( - args.object_ids - .iter() - .map(|&object_id| tag_on_object::CreateUnchecked { - tag_id: args.tag_id, - object_id, - _params: vec![], - }) + let query = db.tag_on_object().delete_many(vec![ + tag_on_object::tag_id::equals(args.tag_id), + tag_on_object::object_id::in_vec( + objects.iter().map(|o| o.id).collect(), + ), + ]); + + sync.write_ops( + db, + ( + objects + .into_iter() + .map(|object| sync.relation_delete(sync_id!(object))) .collect(), - ) - .exec() + query, + ), + ) + .await?; + } else { + let (sync_ops, db_creates) = objects.into_iter().fold( + (vec![], vec![]), + |(mut sync_ops, mut db_creates), object| { + db_creates.push(tag_on_object::CreateUnchecked { + tag_id: args.tag_id, + object_id: object.id, + _params: vec![], + }); + + sync_ops.extend(sync.relation_create(sync_id!(object), [])); + + (sync_ops, db_creates) + }, + ); + + sync.write_ops(db, (sync_ops, db.tag_on_object().create_many(db_creates))) .await?; } @@ -139,7 +183,7 @@ pub(crate) fn mount() -> AlphaRouter { .flatten() .map(|(k, v)| { sync.shared_update( - sync::tag::SyncId { + prisma_sync::tag::SyncId { pub_id: tag.pub_id.clone(), }, k, diff --git a/core/src/job/report.rs b/core/src/job/report.rs index 9f89e4130..862fefd06 100644 --- a/core/src/job/report.rs +++ b/core/src/job/report.rs @@ -1,7 +1,7 @@ use crate::{ library::Library, prisma::job, - util::db::{chain_optional_iter, maybe_missing, MissingFieldError}, + util::db::{maybe_missing, MissingFieldError}, }; use std::fmt::{Display, Formatter}; @@ -203,7 +203,7 @@ impl JobReport { .job() .create( self.id.as_bytes().to_vec(), - chain_optional_iter( + sd_utils::chain_optional_iter( [ job::name::set(Some(self.name.clone())), job::action::set(self.action.clone()), diff --git a/core/src/lib.rs b/core/src/lib.rs index c813469e6..5fa41baac 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -39,7 +39,6 @@ pub(crate) mod node; pub(crate) mod object; pub(crate) mod p2p; pub(crate) mod preferences; -pub(crate) mod sync; pub(crate) mod util; pub(crate) mod volume; @@ -47,6 +46,7 @@ pub struct NodeContext { pub config: Arc, pub job_manager: Arc, pub location_manager: Arc, + pub p2p: Arc, pub event_bus_tx: broadcast::Sender, pub notifications: Arc, } @@ -81,28 +81,32 @@ impl Node { debug!("Initialised 'NodeConfigManager'..."); let job_manager = JobManager::new(); - debug!("Initialised 'JobManager'..."); let notifications = NotificationManager::new(); + debug!("Initialised 'NotificationManager'..."); let location_manager = LocationManager::new(); debug!("Initialised 'LocationManager'..."); + + let (p2p, p2p_stream) = P2PManager::new(config.clone()).await?; + debug!("Initialised 'P2PManager'..."); + let library_manager = LibraryManager::new( data_dir.join("libraries"), Arc::new(NodeContext { config: config.clone(), job_manager: job_manager.clone(), location_manager: location_manager.clone(), - // p2p: p2p.clone(), + p2p: p2p.clone(), event_bus_tx: event_bus.0.clone(), notifications: notifications.clone(), }), ) .await?; debug!("Initialised 'LibraryManager'..."); - let p2p = P2PManager::new(config.clone(), library_manager.clone()).await?; - debug!("Initialised 'P2PManager'..."); + + p2p.start(p2p_stream, library_manager.clone()); #[cfg(debug_assertions)] if let Some(init_data) = init_data { diff --git a/core/src/library/config.rs b/core/src/library/config.rs index 6e0d0fd4c..288bd07e6 100644 --- a/core/src/library/config.rs +++ b/core/src/library/config.rs @@ -2,7 +2,7 @@ use crate::{ node::{NodeConfig, Platform}, prisma::{file_path, indexer_rule, PrismaClient}, util::{ - db::{maybe_missing, uuid_to_bytes}, + db::maybe_missing, migrator::{Migrate, MigratorError}, }, }; @@ -65,9 +65,9 @@ impl Migrate for LibraryConfig { .map(|(i, name)| { db.indexer_rule().update_many( vec![indexer_rule::name::equals(Some(name))], - vec![indexer_rule::pub_id::set(uuid_to_bytes(Uuid::from_u128( - i as u128, - )))], + vec![indexer_rule::pub_id::set(sd_utils::uuid_to_bytes( + Uuid::from_u128(i as u128), + ))], ) }) .collect::>(), @@ -185,6 +185,7 @@ impl Migrate for LibraryConfig { node_platform: Platform::current() as i32, last_seen: now, date_created: node.map(|n| n.date_created).unwrap_or_else(|| now), + // timestamp: Default::default(), // TODO: Source this properly! _params: vec![], } .to_query(db) diff --git a/core/src/library/library.rs b/core/src/library/library.rs index 16c3eabd6..63aa7b444 100644 --- a/core/src/library/library.rs +++ b/core/src/library/library.rs @@ -9,11 +9,11 @@ use crate::{ }, node::NodeConfigManager, object::{ - orphan_remover::OrphanRemoverActor, preview::get_thumbnail_path, + orphan_remover::OrphanRemoverActor, + preview::{get_thumbnail_path, THUMBNAIL_CACHE_DIR_NAME}, thumbnail_remover::ThumbnailRemoverActor, }, prisma::{file_path, location, PrismaClient}, - sync::SyncManager, util::{db::maybe_missing, error::FileIOError}, NodeContext, }; @@ -26,13 +26,14 @@ use std::{ }; use chrono::{DateTime, Utc}; +use sd_core_sync::{SyncManager, SyncMessage}; use sd_p2p::spacetunnel::Identity; use sd_prisma::prisma::notification; use tokio::{fs, io}; use tracing::warn; use uuid::Uuid; -use super::{LibraryConfig, LibraryManagerError}; +use super::{LibraryConfig, LibraryManager, LibraryManagerError}; /// LibraryContext holds context for a library which can be passed around the application. pub struct Library { @@ -42,7 +43,7 @@ pub struct Library { pub config: LibraryConfig, /// db holds the database client for the current library. pub db: Arc, - pub sync: Arc, + pub sync: Arc, /// key manager that provides encryption keys to functions that require them // pub key_manager: Arc, /// node_context holds the node context for the node which this library is running on. @@ -66,6 +67,53 @@ impl Debug for Library { } impl Library { + pub fn new( + id: Uuid, + instance_id: Uuid, + config: LibraryConfig, + identity: Arc, + db: Arc, + library_manager: Arc, + // node_context: Arc, + ) -> Self { + let (sync_manager, mut sync_rx) = SyncManager::new(&db, instance_id); + let node_context = library_manager.node_context.clone(); + + let library = Self { + orphan_remover: OrphanRemoverActor::spawn(db.clone()), + thumbnail_remover: ThumbnailRemoverActor::spawn( + db.clone(), + node_context + .config + .data_directory() + .join(THUMBNAIL_CACHE_DIR_NAME), + ), + id, + db, + config, + node_context, + // key_manager, + sync: Arc::new(sync_manager), + identity: identity.clone(), + }; + + tokio::spawn({ + async move { + while let Ok(op) = sync_rx.recv().await { + let SyncMessage::Created(op) = op else { continue; }; + + library_manager + .node_context + .p2p + .broadcast_sync_events(id, &identity, vec![op], &library_manager) + .await; + } + } + }); + + library + } + pub(crate) fn emit(&self, event: CoreEvent) { if let Err(e) = self.node_context.event_bus_tx.send(event) { warn!("Error sending event to event bus: {e:?}"); diff --git a/core/src/library/manager.rs b/core/src/library/manager.rs index f38ea94fd..bd8f18eca 100644 --- a/core/src/library/manager.rs +++ b/core/src/library/manager.rs @@ -2,12 +2,8 @@ use crate::{ invalidate_query, location::{indexer, LocationManagerError}, node::{NodeConfig, Platform}, - object::{ - orphan_remover::OrphanRemoverActor, preview::THUMBNAIL_CACHE_DIR_NAME, tag, - thumbnail_remover::ThumbnailRemoverActor, - }, + object::tag, prisma::location, - sync::{SyncManager, SyncMessage}, util::{ db::{self, MissingFieldError}, error::{FileIOError, NonUtf8PathError}, @@ -27,33 +23,12 @@ use chrono::Utc; use sd_p2p::spacetunnel::{Identity, IdentityErr}; use sd_prisma::prisma::instance; use thiserror::Error; -use tokio::{ - fs, io, - sync::{broadcast, RwLock}, - try_join, -}; +use tokio::{fs, io, sync::RwLock, try_join}; use tracing::{debug, error, info, warn}; use uuid::Uuid; use super::{Library, LibraryConfig, LibraryConfigWrapped, LibraryName}; -pub enum SubscriberEvent { - Load(Uuid, Arc, broadcast::Receiver), -} - -impl Clone for SubscriberEvent { - fn clone(&self) -> Self { - match self { - Self::Load(id, identity, receiver) => { - Self::Load(*id, identity.clone(), receiver.resubscribe()) - } - } - } -} - -pub trait SubscriberFn: Fn(SubscriberEvent) + Send + Sync + 'static {} -impl SubscriberFn for F {} - /// LibraryManager is a singleton that manages all libraries for a node. pub struct LibraryManager { /// libraries_dir holds the path to the directory where libraries are stored. @@ -61,9 +36,7 @@ pub struct LibraryManager { /// libraries holds the list of libraries which are currently loaded into the node. libraries: RwLock>>, /// node_context holds the context for the node which this library manager is running on. - node_context: Arc, - /// on load subscribers - subscribers: RwLock>>, + pub node_context: Arc, } #[derive(Error, Debug)] @@ -121,12 +94,16 @@ impl LibraryManager { .await .map_err(|e| FileIOError::from((&libraries_dir, e)))?; - let mut libraries = Vec::new(); - let subscribers = RwLock::new(Vec::new()); let mut read_dir = fs::read_dir(&libraries_dir) .await .map_err(|e| FileIOError::from((&libraries_dir, e)))?; + let this = Arc::new(Self { + libraries_dir: libraries_dir.clone(), + libraries: Default::default(), + node_context, + }); + while let Some(entry) = read_dir .next_entry() .await @@ -164,44 +141,17 @@ impl LibraryManager { Err(e) => return Err(FileIOError::from((db_path, e)).into()), } - libraries.push( - Self::load( - library_id, - &db_path, - config_path, - node_context.clone(), - &subscribers, - None, - true, - ) - .await?, - ); + this.load(library_id, &db_path, config_path, None, true) + .await?; } } - Ok(Arc::new(Self { - libraries: RwLock::new(libraries), - libraries_dir, - node_context, - subscribers, - })) - } - - /// subscribe to library events - pub(crate) async fn subscribe(&self, f: F) { - self.subscribers.write().await.push(Box::new(f)); - } - - async fn emit(subscribers: &RwLock>>, event: SubscriberEvent) { - let subscribers = subscribers.read().await; - for subscriber in subscribers.iter() { - subscriber(event.clone()); - } + Ok(this) } /// create creates a new library with the given config and mounts it into the running [LibraryManager]. pub(crate) async fn create( - &self, + self: &Arc, name: LibraryName, description: Option, node_cfg: NodeConfig, @@ -211,7 +161,7 @@ impl LibraryManager { } pub(crate) async fn create_with_uuid( - &self, + self: &Arc, id: Uuid, name: LibraryName, description: Option, @@ -240,25 +190,25 @@ impl LibraryManager { ); let now = Utc::now().fixed_offset(); - let library = Self::load( - id, - self.libraries_dir.join(format!("{id}.db")), - config_path, - self.node_context.clone(), - &self.subscribers, - Some(instance::Create { - pub_id: Uuid::new_v4().as_bytes().to_vec(), - identity: Identity::new().to_bytes(), - node_id: node_cfg.id.as_bytes().to_vec(), - node_name: node_cfg.name.clone(), - node_platform: Platform::current() as i32, - last_seen: now, - date_created: now, - _params: vec![instance::id::set(config.instance_id)], - }), - should_seed, - ) - .await?; + let library = self + .load( + id, + self.libraries_dir.join(format!("{id}.db")), + config_path, + Some(instance::Create { + pub_id: Uuid::new_v4().as_bytes().to_vec(), + identity: Identity::new().to_bytes(), + node_id: node_cfg.id.as_bytes().to_vec(), + node_name: node_cfg.name.clone(), + node_platform: Platform::current() as i32, + last_seen: now, + date_created: now, + // timestamp: Default::default(), // TODO: Source this properly! + _params: vec![instance::id::set(config.instance_id)], + }), + should_seed, + ) + .await?; debug!("Loaded library '{id:?}'"); @@ -270,8 +220,6 @@ impl LibraryManager { invalidate_query!(library, "library.list"); - self.libraries.write().await.push(library); - debug!("Pushed library into manager '{id:?}'"); Ok(LibraryConfigWrapped { uuid: id, config }) @@ -293,9 +241,9 @@ impl LibraryManager { .collect() } - pub(crate) async fn get_all_instances(&self) -> Vec { - vec![] // TODO: Cache in memory - } + // pub(crate) async fn get_all_instances(&self) -> Vec { + // vec![] // TODO: Cache in memory + // } pub(crate) async fn edit( &self, @@ -396,11 +344,10 @@ impl LibraryManager { /// load the library from a given path async fn load( + self: &Arc, id: Uuid, db_path: impl AsRef, config_path: PathBuf, - node_context: Arc, - subscribers: &RwLock>>, create: Option, should_seed: bool, ) -> Result, LibraryManagerError> { @@ -417,7 +364,7 @@ impl LibraryManager { create.to_query(&db).exec().await?; } - let node_config = node_context.config.get().await; + let node_config = self.node_context.config.get().await; let config = LibraryConfig::load_and_migrate(&config_path, &(node_config.clone(), db.clone())) .await?; @@ -462,31 +409,15 @@ impl LibraryManager { // let key_manager = Arc::new(KeyManager::new(vec![]).await?); // seed_keymanager(&db, &key_manager).await?; - let (sync_manager, sync_rx) = SyncManager::new(&db, instance_id); - - Self::emit( - subscribers, - SubscriberEvent::Load(id, identity.clone(), sync_rx), - ) - .await; - - let library = Arc::new(Library { + let library = Arc::new(Library::new( id, + instance_id, config, - // key_manager, - sync: Arc::new(sync_manager), - orphan_remover: OrphanRemoverActor::spawn(db.clone()), - thumbnail_remover: ThumbnailRemoverActor::spawn( - db.clone(), - node_context - .config - .data_directory() - .join(THUMBNAIL_CACHE_DIR_NAME), - ), - db, - node_context, identity, - }); + // key_manager, + db, + self.clone(), + )); if should_seed { library.orphan_remover.invoke().await; diff --git a/core/src/location/file_path_helper/mod.rs b/core/src/location/file_path_helper/mod.rs index 082aa2925..66c9f5ab4 100644 --- a/core/src/location/file_path_helper/mod.rs +++ b/core/src/location/file_path_helper/mod.rs @@ -182,9 +182,8 @@ pub async fn create_file_path( metadata: FilePathMetadata, ) -> Result { use crate::util::db::{device_to_db, inode_to_db}; - use crate::{sync, util::db::uuid_to_bytes}; - use sd_prisma::prisma; + use sd_prisma::{prisma, prisma_sync}; use sd_sync::OperationFactory; use serde_json::json; use uuid::Uuid; @@ -203,7 +202,7 @@ pub async fn create_file_path( vec![ ( location::NAME, - json!(sync::location::SyncId { + json!(prisma_sync::location::SyncId { pub_id: location.pub_id }), ), @@ -223,14 +222,14 @@ pub async fn create_file_path( ] }; - let pub_id = uuid_to_bytes(Uuid::new_v4()); + let pub_id = sd_utils::uuid_to_bytes(Uuid::new_v4()); let created_path = sync .write_ops( db, ( sync.shared_create( - sync::file_path::SyncId { + prisma_sync::file_path::SyncId { pub_id: pub_id.clone(), }, params, diff --git a/core/src/location/indexer/mod.rs b/core/src/location/indexer/mod.rs index f8449462d..7a0497e7c 100644 --- a/core/src/location/indexer/mod.rs +++ b/core/src/location/indexer/mod.rs @@ -1,9 +1,8 @@ use crate::{ library::Library, prisma::{file_path, location, PrismaClient}, - sync, util::{ - db::{device_to_db, inode_to_db, uuid_to_bytes}, + db::{device_to_db, inode_to_db}, error::FileIOError, }, }; @@ -12,6 +11,7 @@ use std::path::Path; use chrono::Utc; use rspc::ErrorCode; +use sd_prisma::prisma_sync; use sd_sync::*; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -103,13 +103,13 @@ async fn execute_indexer_save_step( use file_path::*; - let pub_id = uuid_to_bytes(entry.pub_id); + let pub_id = sd_utils::uuid_to_bytes(entry.pub_id); let (sync_params, db_params): (Vec<_>, Vec<_>) = [ ( ( location::NAME, - json!(sync::location::SyncId { + json!(prisma_sync::location::SyncId { pub_id: pub_id.clone() }), ), @@ -160,8 +160,8 @@ async fn execute_indexer_save_step( ( sync.shared_create( - sync::file_path::SyncId { - pub_id: uuid_to_bytes(entry.pub_id), + prisma_sync::file_path::SyncId { + pub_id: sd_utils::uuid_to_bytes(entry.pub_id), }, sync_params, ), @@ -199,7 +199,7 @@ async fn execute_indexer_update_step( use file_path::*; - let pub_id = uuid_to_bytes(entry.pub_id); + let pub_id = sd_utils::uuid_to_bytes(entry.pub_id); let (sync_params, db_params): (Vec<_>, Vec<_>) = [ // As this file was updated while Spacedrive was offline, we mark the object_id as null @@ -243,7 +243,7 @@ async fn execute_indexer_update_step( .into_iter() .map(|(field, value)| { sync.shared_update( - sync::file_path::SyncId { + prisma_sync::file_path::SyncId { pub_id: pub_id.clone(), }, field, diff --git a/core/src/location/indexer/rules/mod.rs b/core/src/location/indexer/rules/mod.rs index 2b286c123..449c57a9b 100644 --- a/core/src/location/indexer/rules/mod.rs +++ b/core/src/location/indexer/rules/mod.rs @@ -4,7 +4,7 @@ use crate::{ library::Library, prisma::indexer_rule, util::{ - db::{maybe_missing, uuid_to_bytes, MissingFieldError}, + db::{maybe_missing, MissingFieldError}, error::{FileIOError, NonUtf8PathError}, }, }; @@ -135,7 +135,7 @@ impl IndexerRuleCreateArgs { .db .indexer_rule() .create( - uuid_to_bytes(generate_pub_id()), + sd_utils::uuid_to_bytes(generate_pub_id()), vec![ name::set(Some(self.name)), rules_per_kind::set(Some(rules_data)), diff --git a/core/src/location/indexer/rules/seed.rs b/core/src/location/indexer/rules/seed.rs index 40489569a..d2ed421b1 100644 --- a/core/src/location/indexer/rules/seed.rs +++ b/core/src/location/indexer/rules/seed.rs @@ -1,7 +1,6 @@ use crate::{ library::Library, location::indexer::rules::{IndexerRuleError, RulePerKind}, - util::db::uuid_to_bytes, }; use chrono::Utc; use sd_prisma::prisma::indexer_rule; @@ -29,7 +28,7 @@ pub async fn new_or_existing_library(library: &Library) -> Result<(), SeederErro .into_iter() .enumerate() { - let pub_id = uuid_to_bytes(Uuid::from_u128(i as u128)); + let pub_id = sd_utils::uuid_to_bytes(Uuid::from_u128(i as u128)); let rules = rmp_serde::to_vec_named(&rule.rules).map_err(IndexerRuleError::from)?; use indexer_rule::*; diff --git a/core/src/location/indexer/walk.rs b/core/src/location/indexer/walk.rs index cd64a98c9..85b370055 100644 --- a/core/src/location/indexer/walk.rs +++ b/core/src/location/indexer/walk.rs @@ -5,7 +5,7 @@ use crate::{ }, prisma::file_path, util::{ - db::{device_from_db, from_bytes_to_uuid, inode_from_db}, + db::{device_from_db, inode_from_db}, error::FileIOError, }, }; @@ -385,7 +385,9 @@ where ) - *date_modified > Duration::milliseconds(1) { - to_update.push((from_bytes_to_uuid(&file_path.pub_id), entry).into()); + to_update.push( + (sd_utils::from_bytes_to_uuid(&file_path.pub_id), entry).into(), + ); } } diff --git a/core/src/location/manager/watcher/utils.rs b/core/src/location/manager/watcher/utils.rs index 2ec986850..aab522331 100644 --- a/core/src/location/manager/watcher/utils.rs +++ b/core/src/location/manager/watcher/utils.rs @@ -20,7 +20,6 @@ use crate::{ validation::hash::file_checksum, }, prisma::{file_path, location, object}, - sync::{self, OperationFactory}, util::{ db::{device_from_db, device_to_db, inode_from_db, inode_to_db, maybe_missing}, error::FileIOError, @@ -47,6 +46,8 @@ use sd_file_ext::extensions::ImageExtension; use chrono::{DateTime, Local, Utc}; use notify::Event; use prisma_client_rust::{raw, PrismaValue}; +use sd_prisma::prisma_sync; +use sd_sync::OperationFactory; use serde_json::json; use tokio::{fs, io::ErrorKind}; use tracing::{debug, error, trace, warn}; @@ -508,7 +509,7 @@ async fn inner_update_file( .into_iter() .map(|(field, value)| { sync.shared_update( - sync::file_path::SyncId { + prisma_sync::file_path::SyncId { pub_id: file_path.pub_id.clone(), }, field, @@ -544,7 +545,7 @@ async fn inner_update_file( sync.write_op( db, sync.shared_update( - sync::object::SyncId { + prisma_sync::object::SyncId { pub_id: object.pub_id.clone(), }, object::kind::NAME, diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index dacdd45a0..004e5b5da 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -8,8 +8,7 @@ use crate::{ preview::{shallow_thumbnailer, thumbnailer_job::ThumbnailerJobInit}, }, prisma::{file_path, indexer_rules_in_location, location, PrismaClient}, - sync, - util::{db::chain_optional_iter, error::FileIOError}, + util::error::FileIOError, }; use std::{ @@ -22,6 +21,7 @@ use chrono::Utc; use futures::future::TryFutureExt; use normpath::PathExt; use prisma_client_rust::{operator::and, or, QueryError}; +use sd_prisma::prisma_sync; use sd_sync::*; use serde::Deserialize; use serde_json::json; @@ -274,7 +274,7 @@ impl LocationUpdateArgs { .into_iter() .map(|p| { sync.shared_update( - sync::location::SyncId { + prisma_sync::location::SyncId { pub_id: location.pub_id.clone(), }, p.0, @@ -483,7 +483,7 @@ pub async fn relink_location( sync.write_op( db, sync.shared_update( - sync::location::SyncId { + prisma_sync::location::SyncId { pub_id: pub_id.clone(), }, location::path::NAME, @@ -588,7 +588,7 @@ async fn create_location( db, ( sync.shared_create( - sync::location::SyncId { + prisma_sync::location::SyncId { pub_id: location_pub_id.as_bytes().to_vec(), }, [ @@ -597,7 +597,7 @@ async fn create_location( (location::date_created::NAME, json!(date_created)), ( location::instance_id::NAME, - json!(sync::instance::SyncId { + json!(prisma_sync::instance::SyncId { pub_id: vec![], // id: library.config.instance_id, }), @@ -697,7 +697,7 @@ pub async fn delete_directory( ) -> Result<(), QueryError> { let Library { db, .. } = library; - let children_params = chain_optional_iter( + let children_params = sd_utils::chain_optional_iter( [file_path::location_id::equals(Some(location_id))], [parent_iso_file_path.and_then(|parent| { parent diff --git a/core/src/object/file_identifier/file_identifier_job.rs b/core/src/object/file_identifier/file_identifier_job.rs index 56a3feb94..96798e709 100644 --- a/core/src/object/file_identifier/file_identifier_job.rs +++ b/core/src/object/file_identifier/file_identifier_job.rs @@ -9,7 +9,7 @@ use crate::{ file_path_for_file_identifier, IsolatedFilePathData, }, prisma::{file_path, location, PrismaClient, SortOrder}, - util::db::{chain_optional_iter, maybe_missing}, + util::db::maybe_missing, }; use std::{ @@ -238,7 +238,7 @@ fn orphan_path_filters( file_path_id: Option, maybe_sub_iso_file_path: &Option>, ) -> Vec { - chain_optional_iter( + sd_utils::chain_optional_iter( [ file_path::object_id::equals(None), file_path::is_dir::equals(Some(false)), diff --git a/core/src/object/file_identifier/mod.rs b/core/src/object/file_identifier/mod.rs index 6934599c7..d0de8b1f5 100644 --- a/core/src/object/file_identifier/mod.rs +++ b/core/src/object/file_identifier/mod.rs @@ -6,14 +6,13 @@ use crate::{ }, object::{cas::generate_cas_id, object_for_file_identifier}, prisma::{file_path, location, object, PrismaClient}, - sync::{self, CRDTOperation, OperationFactory, SyncManager}, - util::{ - db::{maybe_missing, uuid_to_bytes}, - error::FileIOError, - }, + util::{db::maybe_missing, error::FileIOError}, }; +use sd_core_sync::SyncManager; use sd_file_ext::{extensions::Extension, kind::ObjectKind}; +use sd_prisma::prisma_sync; +use sd_sync::{CRDTOperation, OperationFactory}; use std::{ collections::{HashMap, HashSet}, @@ -138,14 +137,14 @@ async fn identifier_job_step( .map(|(pub_id, (meta, _))| { ( sync.shared_update( - sync::file_path::SyncId { - pub_id: uuid_to_bytes(*pub_id), + prisma_sync::file_path::SyncId { + pub_id: sd_utils::uuid_to_bytes(*pub_id), }, file_path::cas_id::NAME, json!(&meta.cas_id), ), db.file_path().update( - file_path::pub_id::equals(uuid_to_bytes(*pub_id)), + file_path::pub_id::equals(sd_utils::uuid_to_bytes(*pub_id)), vec![file_path::cas_id::set(Some(meta.cas_id.clone()))], ), ) @@ -230,8 +229,8 @@ async fn identifier_job_step( .map(|(file_path_pub_id, (meta, fp))| { let object_pub_id = Uuid::new_v4(); - let sync_id = || sync::object::SyncId { - pub_id: uuid_to_bytes(object_pub_id), + let sync_id = || prisma_sync::object::SyncId { + pub_id: sd_utils::uuid_to_bytes(object_pub_id), }; let kind = meta.kind as i32; @@ -251,7 +250,7 @@ async fn identifier_job_step( let object_creation_args = ( sync.shared_create(sync_id(), sync_params), - object::create_unchecked(uuid_to_bytes(object_pub_id), db_params), + object::create_unchecked(sd_utils::uuid_to_bytes(object_pub_id), db_params), ); (object_creation_args, { @@ -319,16 +318,16 @@ fn file_path_object_connect_ops<'db>( ( sync.shared_update( - sync::file_path::SyncId { - pub_id: uuid_to_bytes(file_path_id), + prisma_sync::file_path::SyncId { + pub_id: sd_utils::uuid_to_bytes(file_path_id), }, file_path::object::NAME, - json!(sync::object::SyncId { + json!(prisma_sync::object::SyncId { pub_id: vec_id.clone() }), ), db.file_path().update( - file_path::pub_id::equals(uuid_to_bytes(file_path_id)), + file_path::pub_id::equals(sd_utils::uuid_to_bytes(file_path_id)), vec![file_path::object::connect(object::pub_id::equals(vec_id))], ), ) diff --git a/core/src/object/file_identifier/shallow.rs b/core/src/object/file_identifier/shallow.rs index d08807bc8..2fd0f4f1a 100644 --- a/core/src/object/file_identifier/shallow.rs +++ b/core/src/object/file_identifier/shallow.rs @@ -7,7 +7,7 @@ use crate::{ file_path_for_file_identifier, IsolatedFilePathData, }, prisma::{file_path, location, PrismaClient, SortOrder}, - util::db::{chain_optional_iter, maybe_missing}, + util::db::maybe_missing, }; use std::path::{Path, PathBuf}; @@ -120,7 +120,7 @@ fn orphan_path_filters( file_path_id: Option, sub_iso_file_path: &IsolatedFilePathData<'_>, ) -> Vec { - chain_optional_iter( + sd_utils::chain_optional_iter( [ file_path::object_id::equals(None), file_path::is_dir::equals(Some(false)), diff --git a/core/src/object/tag/mod.rs b/core/src/object/tag/mod.rs index 70dce8174..ae51cd5b7 100644 --- a/core/src/object/tag/mod.rs +++ b/core/src/object/tag/mod.rs @@ -1,6 +1,7 @@ pub mod seed; use chrono::{DateTime, FixedOffset, Utc}; +use sd_prisma::prisma_sync; use sd_sync::*; use serde::Deserialize; use serde_json::json; @@ -8,7 +9,7 @@ use specta::Type; use uuid::Uuid; -use crate::{library::Library, prisma::tag, sync}; +use crate::{library::Library, prisma::tag}; #[derive(Type, Deserialize, Clone)] pub struct TagCreateArgs { @@ -28,7 +29,7 @@ impl TagCreateArgs { db, ( sync.shared_create( - sync::tag::SyncId { + prisma_sync::tag::SyncId { pub_id: pub_id.clone(), }, [ diff --git a/core/src/object/validation/validator_job.rs b/core/src/object/validation/validator_job.rs index b1ec3af4b..c77038c59 100644 --- a/core/src/object/validation/validator_job.rs +++ b/core/src/object/validation/validator_job.rs @@ -8,11 +8,7 @@ use crate::{ file_path_for_object_validator, IsolatedFilePathData, }, prisma::{file_path, location}, - sync::{self, OperationFactory}, - util::{ - db::{chain_optional_iter, maybe_missing}, - error::FileIOError, - }, + util::{db::maybe_missing, error::FileIOError}, }; use std::{ @@ -20,6 +16,8 @@ use std::{ path::{Path, PathBuf}, }; +use sd_prisma::prisma_sync; +use sd_sync::OperationFactory; use serde::{Deserialize, Serialize}; use serde_json::json; use tracing::info; @@ -101,7 +99,7 @@ impl StatefulJob for ObjectValidatorJobInit { let steps = db .file_path() - .find_many(chain_optional_iter( + .find_many(sd_utils::chain_optional_iter( [ file_path::location_id::equals(Some(init.location.id)), file_path::is_dir::equals(Some(false)), @@ -153,7 +151,7 @@ impl StatefulJob for ObjectValidatorJobInit { sync.write_op( db, sync.shared_update( - sync::file_path::SyncId { + prisma_sync::file_path::SyncId { pub_id: file_path.pub_id.clone(), }, file_path::integrity_checksum::NAME, diff --git a/core/src/p2p/p2p_manager.rs b/core/src/p2p/p2p_manager.rs index 0d6c22384..dd2887627 100644 --- a/core/src/p2p/p2p_manager.rs +++ b/core/src/p2p/p2p_manager.rs @@ -10,7 +10,7 @@ use futures::Stream; use sd_p2p::{ spaceblock::{BlockSize, SpaceblockRequest, Transfer}, spacetunnel::{Identity, Tunnel}, - Event, Manager, ManagerError, MetadataManager, PeerId, + Event, Manager, ManagerError, ManagerStream, MetadataManager, PeerId, }; use sd_sync::CRDTOperation; use serde::Serialize; @@ -25,10 +25,9 @@ use tracing::{debug, error, info, warn}; use uuid::Uuid; use crate::{ - library::{LibraryManager, SubscriberEvent}, + library::LibraryManager, node::{NodeConfig, NodeConfigManager}, p2p::{OperatingSystem, SPACEDRIVE_APP_ID}, - sync::SyncMessage, }; use super::{Header, PairingManager, PairingStatus, PeerMetadata}; @@ -69,25 +68,23 @@ pub struct P2PManager { pub metadata_manager: Arc>, pub spacedrop_progress: Arc>>>, pub pairing: Arc, - library_manager: Arc, } impl P2PManager { pub async fn new( node_config: Arc, - library_manager: Arc, - ) -> Result, ManagerError> { + ) -> Result<(Arc, ManagerStream), ManagerError> { let (config, keypair) = { let config = node_config.get().await; ( - Self::config_to_metadata(&config, &library_manager).await, + Self::config_to_metadata(&config /* , &library_manager */).await, config.keypair, ) }; let metadata_manager = MetadataManager::new(config); - let (manager, mut stream) = + let (manager, stream) = Manager::new(SPACEDRIVE_APP_ID, &keypair, metadata_manager.clone()).await?; info!( @@ -102,13 +99,55 @@ impl P2PManager { let spacedrop_pairing_reqs = Arc::new(Mutex::new(HashMap::new())); let spacedrop_progress = Arc::new(Mutex::new(HashMap::new())); - let pairing = PairingManager::new(manager.clone(), tx.clone(), library_manager.clone()); + let pairing = PairingManager::new(manager.clone(), tx.clone()); + + // TODO: proper shutdown + // https://docs.rs/ctrlc/latest/ctrlc/ + // https://docs.rs/system_shutdown/latest/system_shutdown/ + + let this = Arc::new(Self { + pairing, + events: (tx, rx), + manager, + spacedrop_pairing_reqs, + metadata_manager, + spacedrop_progress, + }); + + // library_manager + // .subscribe({ + // let this = this.clone(); + // move |event| match event { + // SubscriberEvent::Load(library_id, library_identity, mut sync_rx) => { + // let this = this.clone(); + // } + // } + // }) + // .await; + + // TODO: Probs remove this once connection timeout/keepalive are working correctly tokio::spawn({ - let events = tx.clone(); - let spacedrop_pairing_reqs = spacedrop_pairing_reqs.clone(); - let spacedrop_progress = spacedrop_progress.clone(); - let library_manager = library_manager.clone(); - let pairing = pairing.clone(); + let this = this.clone(); + async move { + loop { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + this.ping().await; + } + } + }); + + Ok((this, stream)) + } + pub fn start( + &self, + mut stream: ManagerStream, + library_manager: Arc, + ) { + tokio::spawn({ + let events = self.events.0.clone(); + let spacedrop_pairing_reqs = self.spacedrop_pairing_reqs.clone(); + let spacedrop_progress = self.spacedrop_progress.clone(); + let pairing = self.pairing.clone(); async move { let mut shutdown = false; @@ -204,7 +243,9 @@ impl P2PManager { }; } Header::Pair => { - pairing.responder(event.peer_id, stream).await; + pairing + .responder(event.peer_id, stream, library_manager) + .await; } Header::Sync(library_id) => { let mut stream = Tunnel::from_stream(stream).await.unwrap(); @@ -228,14 +269,12 @@ impl P2PManager { }; for op in operations { - library.sync.ingest_op(op).await.unwrap_or_else( - |err| { - error!( - "error ingesting operation for library '{}': {err:?}", - library.id - ); - }, - ); + library.sync.apply_op(op).await.unwrap_or_else(|err| { + error!( + "error ingesting operation for library '{}': {err:?}", + library.id + ); + }); } } } @@ -259,57 +298,11 @@ impl P2PManager { } } }); - - // TODO: proper shutdown - // https://docs.rs/ctrlc/latest/ctrlc/ - // https://docs.rs/system_shutdown/latest/system_shutdown/ - - let this = Arc::new(Self { - pairing, - events: (tx, rx), - manager, - spacedrop_pairing_reqs, - metadata_manager, - spacedrop_progress, - library_manager: library_manager.clone(), - }); - - library_manager - .subscribe({ - let this = this.clone(); - move |event| match event { - SubscriberEvent::Load(library_id, library_identity, mut sync_rx) => { - let this = this.clone(); - tokio::spawn(async move { - while let Ok(op) = sync_rx.recv().await { - let SyncMessage::Created(op) = op else { continue; }; - - this.broadcast_sync_events(library_id, &library_identity, vec![op]) - .await; - } - }); - } - } - }) - .await; - - // TODO: Probs remove this once connection timeout/keepalive are working correctly - tokio::spawn({ - let this = this.clone(); - async move { - loop { - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - this.ping().await; - } - } - }); - - Ok(this) } async fn config_to_metadata( config: &NodeConfig, - library_manager: &LibraryManager, + // library_manager: &LibraryManager, ) -> PeerMetadata { PeerMetadata { name: config.name.clone(), @@ -317,24 +310,27 @@ impl P2PManager { version: Some(env!("CARGO_PKG_VERSION").to_string()), email: config.p2p_email.clone(), img_url: config.p2p_img_url.clone(), - instances: library_manager - .get_all_instances() - .await - .into_iter() - .filter_map(|i| { - Identity::from_bytes(&i.identity) - .map(|i| hex::encode(i.public_key().to_bytes())) - .ok() - }) - .collect(), + // instances: library_manager + // .get_all_instances() + // .await + // .into_iter() + // .filter_map(|i| { + // Identity::from_bytes(&i.identity) + // .map(|i| hex::encode(i.public_key().to_bytes())) + // .ok() + // }) + // .collect(), } } #[allow(unused)] // TODO: Should probs be using this - pub async fn update_metadata(&self, node_config_manager: &NodeConfigManager) { - self.metadata_manager.update( - Self::config_to_metadata(&node_config_manager.get().await, &self.library_manager).await, - ); + pub async fn update_metadata( + &self, + node_config_manager: &NodeConfigManager, + library_manager: &LibraryManager, + ) { + self.metadata_manager + .update(Self::config_to_metadata(&node_config_manager.get().await).await); } pub async fn accept_spacedrop(&self, id: Uuid, path: String) { @@ -358,7 +354,10 @@ impl P2PManager { library_id: Uuid, _identity: &Identity, event: Vec, + library_manager: &LibraryManager, ) { + println!("broadcasting sync events!"); + let mut buf = match rmp_serde::to_vec_named(&event) { Ok(buf) => buf, Err(e) => { @@ -374,7 +373,7 @@ impl P2PManager { // TODO: Establish a connection to them - let _library = self.library_manager.get_library(library_id).await.unwrap(); + let _library = library_manager.get_library(library_id).await.unwrap(); todo!(); diff --git a/core/src/p2p/pairing/mod.rs b/core/src/p2p/pairing/mod.rs index 9d1ba8666..70809cf98 100644 --- a/core/src/p2p/pairing/mod.rs +++ b/core/src/p2p/pairing/mod.rs @@ -38,21 +38,21 @@ pub struct PairingManager { events_tx: broadcast::Sender, pairing_response: RwLock>>, manager: Arc>, - library_manager: Arc, + // library_manager: Arc, } impl PairingManager { pub fn new( manager: Arc>, events_tx: broadcast::Sender, - library_manager: Arc, + // library_manager: Arc, ) -> Arc { Arc::new(Self { id: AtomicU16::new(0), events_tx, pairing_response: RwLock::new(HashMap::new()), manager, - library_manager, + // library_manager, }) } @@ -70,7 +70,12 @@ impl PairingManager { // TODO: Error handling - pub async fn originator(self: Arc, peer_id: PeerId, node_config: NodeConfig) -> u16 { + pub async fn originator( + self: Arc, + peer_id: PeerId, + node_config: NodeConfig, + library_manager: Arc, + ) -> u16 { // TODO: Timeout for max number of pairings in a time period let pairing_id = self.id.fetch_add(1, Ordering::SeqCst); @@ -119,8 +124,7 @@ impl PairingManager { // TODO: Future - Library in pairing state // TODO: Create library - if self - .library_manager + if library_manager .get_all_libraries() .await .into_iter() @@ -134,8 +138,7 @@ impl PairingManager { return; } - let library_config = self - .library_manager + let library_config = library_manager .create_with_uuid( library_id, LibraryName::new(library_name).unwrap(), @@ -145,8 +148,7 @@ impl PairingManager { ) .await .unwrap(); - let library = self - .library_manager + let library = library_manager .get_library(library_config.uuid) .await .unwrap(); @@ -207,6 +209,7 @@ impl PairingManager { self: Arc, peer_id: PeerId, mut stream: impl AsyncRead + AsyncWrite + Unpin, + library_manager: Arc, ) { let pairing_id = self.id.fetch_add(1, Ordering::SeqCst); self.emit_progress(pairing_id, PairingStatus::EstablishingConnection); @@ -239,7 +242,7 @@ impl PairingManager { }; info!("The user accepted pairing '{pairing_id}' for library '{library_id}'!"); - let library = self.library_manager.get_library(library_id).await.unwrap(); + let library = library_manager.get_library(library_id).await.unwrap(); stream .write_all( &PairingResponse::Accepted { diff --git a/core/src/p2p/pairing/proto.rs b/core/src/p2p/pairing/proto.rs index 2019e6092..8d1c8d361 100644 --- a/core/src/p2p/pairing/proto.rs +++ b/core/src/p2p/pairing/proto.rs @@ -40,6 +40,7 @@ impl From for instance::CreateUnchecked { node_platform: i.node_platform as i32, last_seen: i.last_seen.into(), date_created: i.date_created.into(), + // timestamp: Default::default(), // TODO: Source this properly! _params: vec![], } } diff --git a/core/src/p2p/peer_metadata.rs b/core/src/p2p/peer_metadata.rs index 6aaa673cf..ccddfca80 100644 --- a/core/src/p2p/peer_metadata.rs +++ b/core/src/p2p/peer_metadata.rs @@ -1,6 +1,5 @@ use std::{collections::HashMap, env, str::FromStr}; -use itertools::Itertools; use sd_p2p::Metadata; use serde::{Deserialize, Serialize}; use specta::Type; @@ -14,7 +13,7 @@ pub struct PeerMetadata { pub(super) version: Option, pub(super) email: Option, pub(super) img_url: Option, - pub(super) instances: Vec, + // pub(super) instances: Vec, } impl Metadata for PeerMetadata { @@ -33,7 +32,7 @@ impl Metadata for PeerMetadata { if let Some(img_url) = self.img_url { map.insert("img_url".to_owned(), img_url); } - map.insert("instances".to_owned(), self.instances.into_iter().join(",")); + // map.insert("instances".to_owned(), self.instances.into_iter().join(",")); map } @@ -56,15 +55,15 @@ impl Metadata for PeerMetadata { version: data.get("version").map(|v| v.to_owned()), email: data.get("email").map(|v| v.to_owned()), img_url: data.get("img_url").map(|v| v.to_owned()), - instances: data - .get("instances") - .ok_or_else(|| { - "DNS record for field 'instances' missing. Unable to decode 'PeerMetadata'!" - .to_owned() - })? - .split(',') - .map(|s| s.parse().map_err(|_| "Unable to parse instance 'Uuid'!")) - .collect::, _>>()?, + // instances: data + // .get("instances") + // .ok_or_else(|| { + // "DNS record for field 'instances' missing. Unable to decode 'PeerMetadata'!" + // .to_owned() + // })? + // .split(',') + // .map(|s| s.parse().map_err(|_| "Unable to parse instance 'Uuid'!")) + // .collect::, _>>()?, }) } } diff --git a/core/src/preferences/kv.rs b/core/src/preferences/kv.rs index f7927a239..8ca78875d 100644 --- a/core/src/preferences/kv.rs +++ b/core/src/preferences/kv.rs @@ -107,7 +107,7 @@ impl PreferenceKVs { self } - pub fn to_upserts(self, db: &PrismaClient) -> Vec { + pub fn into_upserts(self, db: &PrismaClient) -> Vec { self.0 .into_iter() .map(|(key, value)| { diff --git a/core/src/preferences/mod.rs b/core/src/preferences/mod.rs index 29b3465b4..7dabf1d61 100644 --- a/core/src/preferences/mod.rs +++ b/core/src/preferences/mod.rs @@ -25,7 +25,7 @@ impl LibraryPreferences { pub async fn write(self, db: &PrismaClient) -> prisma_client_rust::Result<()> { let kvs = self.to_kvs(); - db._batch(kvs.to_upserts(db)).await?; + db._batch(kvs.into_upserts(db)).await?; Ok(()) } diff --git a/core/src/sync/manager.rs b/core/src/sync/manager.rs deleted file mode 100644 index 76039a864..000000000 --- a/core/src/sync/manager.rs +++ /dev/null @@ -1,220 +0,0 @@ -#![allow(clippy::unwrap_used, clippy::panic)] // TODO: Brendan remove this once you've got error handling here - -use crate::prisma::*; -use sd_sync::*; - -use std::{collections::HashMap, sync::Arc}; - -use serde_json::to_vec; -use tokio::sync::broadcast::{self, Receiver, Sender}; -use uhlc::{HLCBuilder, HLC, NTP64}; -use uuid::Uuid; - -use super::ModelSyncData; - -#[derive(Clone)] -pub enum SyncMessage { - Ingested(CRDTOperation), - Created(CRDTOperation), -} - -pub struct SyncManager { - db: Arc, - instance: Uuid, - _clocks: HashMap, - clock: HLC, - pub tx: Sender, -} - -impl SyncManager { - pub fn new(db: &Arc, instance: Uuid) -> (Self, Receiver) { - let (tx, rx) = broadcast::channel(64); - - ( - Self { - db: db.clone(), - instance, - clock: HLCBuilder::new().with_id(instance.into()).build(), - _clocks: Default::default(), - tx, - }, - rx, - ) - } - - pub async fn write_ops<'item, I: prisma_client_rust::BatchItem<'item>>( - &self, - tx: &PrismaClient, - (_ops, queries): (Vec, I), - ) -> prisma_client_rust::Result<::ReturnValue> { - #[cfg(feature = "sync-messages")] - let res = { - let shared = _ops - .iter() - .filter_map(|op| match &op.typ { - CRDTOperationType::Shared(shared_op) => { - let kind = match &shared_op.data { - SharedOperationData::Create => "c", - SharedOperationData::Update { .. } => "u", - SharedOperationData::Delete => "d", - }; - - Some(tx.shared_operation().create( - op.id.as_bytes().to_vec(), - op.timestamp.0 as i64, - shared_op.model.to_string(), - to_vec(&shared_op.record_id).unwrap(), - kind.to_string(), - to_vec(&shared_op.data).unwrap(), - instance::pub_id::equals(op.instance.as_bytes().to_vec()), - vec![], - )) - } - _ => None, - }) - .collect::>(); - - let (res, _) = tx._batch((queries, shared)).await?; - - for op in _ops { - self.tx.send(SyncMessage::Created(op)).ok(); - } - - res - }; - #[cfg(not(feature = "sync-messages"))] - let res = tx._batch([queries]).await?.remove(0); - - Ok(res) - } - - #[allow(unused_variables)] - pub async fn write_op<'item, Q: prisma_client_rust::BatchItem<'item>>( - &self, - tx: &PrismaClient, - op: CRDTOperation, - query: Q, - ) -> prisma_client_rust::Result<::ReturnValue> { - #[cfg(feature = "sync-messages")] - let ret = { - let ret = match &op.typ { - CRDTOperationType::Shared(shared_op) => { - let kind = match &shared_op.data { - SharedOperationData::Create => "c", - SharedOperationData::Update { .. } => "u", - SharedOperationData::Delete => "d", - }; - - tx._batch(( - tx.shared_operation().create( - op.id.as_bytes().to_vec(), - op.timestamp.0 as i64, - shared_op.model.to_string(), - to_vec(&shared_op.record_id).unwrap(), - kind.to_string(), - to_vec(&shared_op.data).unwrap(), - instance::pub_id::equals(op.instance.as_bytes().to_vec()), - vec![], - ), - query, - )) - .await? - .1 - } - _ => todo!(), - }; - - self.tx.send(SyncMessage::Created(op)).ok(); - - ret - }; - #[cfg(not(feature = "sync-messages"))] - let ret = tx._batch(vec![query]).await?.remove(0); - - Ok(ret) - } - - pub async fn get_ops(&self) -> prisma_client_rust::Result> { - Ok(self - .db - .shared_operation() - .find_many(vec![]) - .order_by(shared_operation::timestamp::order(SortOrder::Asc)) - .include(shared_operation::include!({ instance: select { - pub_id - } })) - .exec() - .await? - .into_iter() - .flat_map(|op| { - Some(CRDTOperation { - id: Uuid::from_slice(&op.id).ok()?, - instance: Uuid::from_slice(&op.instance.pub_id).ok()?, - timestamp: NTP64(op.timestamp as u64), - typ: CRDTOperationType::Shared(SharedOperation { - record_id: serde_json::from_slice(&op.record_id).ok()?, - model: op.model, - data: serde_json::from_slice(&op.data).ok()?, - }), - }) - }) - .collect()) - } - - pub async fn ingest_op(&self, op: CRDTOperation) -> prisma_client_rust::Result<()> { - let db = &self.db; - - if db - .instance() - .find_unique(instance::pub_id::equals(op.instance.as_bytes().to_vec())) - .exec() - .await? - .is_none() - { - panic!("Node is not paired!") - } - - let msg = SyncMessage::Ingested(op.clone()); - - ModelSyncData::from_op(op.typ.clone()) - .unwrap() - .exec(db) - .await?; - - if let CRDTOperationType::Shared(shared_op) = op.typ { - let kind = match &shared_op.data { - SharedOperationData::Create => "c", - SharedOperationData::Update { .. } => "u", - SharedOperationData::Delete => "d", - }; - - db.shared_operation() - .create( - op.id.as_bytes().to_vec(), - op.timestamp.0 as i64, - shared_op.model.to_string(), - to_vec(&shared_op.record_id).unwrap(), - kind.to_string(), - to_vec(&shared_op.data).unwrap(), - instance::pub_id::equals(op.instance.as_bytes().to_vec()), - vec![], - ) - .exec() - .await?; - } - - self.tx.send(msg).ok(); - - Ok(()) - } -} - -impl OperationFactory for SyncManager { - fn get_clock(&self) -> &HLC { - &self.clock - } - - fn get_instance(&self) -> Uuid { - self.instance - } -} diff --git a/core/src/sync/mod.rs b/core/src/sync/mod.rs deleted file mode 100644 index ca2377021..000000000 --- a/core/src/sync/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod manager; - -pub use crate::prisma_sync::*; -pub use manager::*; -pub use sd_sync::*; diff --git a/core/src/util/db.rs b/core/src/util/db.rs index df43a24e0..f07168269 100644 --- a/core/src/util/db.rs +++ b/core/src/util/db.rs @@ -1,7 +1,6 @@ use crate::prisma::{self, PrismaClient}; use prisma_client_rust::{migrations::*, NewClientError}; use thiserror::Error; -use uuid::Uuid; /// MigrationError represents an error that occurring while opening a initialising and running migrations on the database. #[derive(Error, Debug)] @@ -58,28 +57,6 @@ pub async fn load_and_migrate(db_url: &str) -> Result`, -/// removing any `None` values in the process -pub fn chain_optional_iter( - required: impl IntoIterator, - optional: impl IntoIterator>, -) -> Vec { - required - .into_iter() - .map(Some) - .chain(optional) - .flatten() - .collect() -} - -pub fn uuid_to_bytes(uuid: Uuid) -> Vec { - uuid.as_bytes().to_vec() -} - -pub fn from_bytes_to_uuid(bytes: &[u8]) -> Uuid { - Uuid::from_slice(bytes).expect("corrupted uuid in database") -} - pub fn inode_from_db(db_inode: &[u8]) -> u64 { u64::from_le_bytes(db_inode.try_into().expect("corrupted inode in database")) } diff --git a/core/src/util/debug_initializer.rs b/core/src/util/debug_initializer.rs index 378e44da1..6f4fbd82d 100644 --- a/core/src/util/debug_initializer.rs +++ b/core/src/util/debug_initializer.rs @@ -3,6 +3,7 @@ use std::{ io, path::{Path, PathBuf}, + sync::Arc, time::Duration, }; @@ -96,7 +97,7 @@ impl InitConfig { pub async fn apply( self, - library_manager: &LibraryManager, + library_manager: &Arc, node_cfg: NodeConfig, ) -> Result<(), InitConfigError> { info!("Initializing app from file: {:?}", self.path); diff --git a/crates/sync-generator/src/lib.rs b/crates/sync-generator/src/lib.rs index 2c3a3f0e5..5dbee5571 100644 --- a/crates/sync-generator/src/lib.rs +++ b/crates/sync-generator/src/lib.rs @@ -43,7 +43,7 @@ impl<'a> ModelSyncType<'a> { .field("id") .and_then(|field| match field { AttributeFieldValue::Single(s) => Some(s), - AttributeFieldValue::List(l) => None, + AttributeFieldValue::List(_) => None, }) .and_then(|name| model.fields().find(|f| f.name() == *name))?; @@ -58,20 +58,20 @@ impl<'a> ModelSyncType<'a> { attr.field(name) .and_then(|field| match field { AttributeFieldValue::Single(s) => Some(*s), - AttributeFieldValue::List(l) => None, + AttributeFieldValue::List(_) => None, }) .and_then(|name| { match model .fields() .find(|f| f.name() == name) - .expect(&format!("'{name}' field not found")) + .unwrap_or_else(|| panic!("'{name}' field not found")) .refine() { RefinedFieldWalker::Relation(r) => Some(r), _ => None, } }) - .expect(&format!("'{name}' must be a relation field")) + .unwrap_or_else(|| panic!("'{name}' must be a relation field")) }; Self::Relation { @@ -88,10 +88,9 @@ impl<'a> ModelSyncType<'a> { fn sync_id(&self) -> Vec { match self { // Self::Owned { id } => id.clone(), - Self::Local { id } => vec![id.clone()], - Self::Shared { id } => vec![id.clone()], + Self::Local { id } => vec![*id], + Self::Shared { id } => vec![*id], Self::Relation { group, item } => vec![(*group).into(), (*item).into()], - _ => vec![], } } } diff --git a/crates/sync/src/crdt.rs b/crates/sync/src/crdt.rs index e985b3cc0..4ff63e389 100644 --- a/crates/sync/src/crdt.rs +++ b/crates/sync/src/crdt.rs @@ -6,6 +6,36 @@ use specta::Type; use uhlc::NTP64; use uuid::Uuid; +pub enum OperationKind<'a> { + Create, + Update(&'a str), + Delete, +} + +impl std::fmt::Display for OperationKind<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OperationKind::Create => write!(f, "c"), + OperationKind::Update(field) => write!(f, "u:{}", field), + OperationKind::Delete => write!(f, "d"), + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, Type)] +pub struct RelationOperation { + pub relation_item: Value, + pub relation_group: Value, + pub relation: String, + pub data: RelationOperationData, +} + +impl RelationOperation { + pub fn kind(&self) -> OperationKind { + self.data.as_kind() + } +} + #[derive(Serialize, Deserialize, Clone, Debug, Type)] pub enum RelationOperationData { #[serde(rename = "c")] @@ -16,12 +46,27 @@ pub enum RelationOperationData { Delete, } +impl RelationOperationData { + fn as_kind(&self) -> OperationKind { + match self { + Self::Create => OperationKind::Create, + Self::Update { field, .. } => OperationKind::Update(field), + Self::Delete => OperationKind::Delete, + } + } +} + #[derive(Serialize, Deserialize, Clone, Debug, Type)] -pub struct RelationOperation { - pub relation_item: Value, - pub relation_group: Value, - pub relation: String, - pub data: RelationOperationData, +pub struct SharedOperation { + pub record_id: Value, + pub model: String, + pub data: SharedOperationData, +} + +impl SharedOperation { + pub fn kind(&self) -> OperationKind { + self.data.as_kind() + } } #[derive(Serialize, Deserialize, Clone, Debug, Type)] @@ -34,11 +79,14 @@ pub enum SharedOperationData { Delete, } -#[derive(Serialize, Deserialize, Clone, Debug, Type)] -pub struct SharedOperation { - pub record_id: Value, - pub model: String, - pub data: SharedOperationData, +impl SharedOperationData { + fn as_kind(&self) -> OperationKind { + match self { + Self::Create => OperationKind::Create, + Self::Update { field, .. } => OperationKind::Update(field), + Self::Delete => OperationKind::Delete, + } + } } // #[derive(Serialize, Deserialize, Clone, Debug, Type)] diff --git a/crates/sync/src/lib.rs b/crates/sync/src/lib.rs index 603ccb127..7fb007988 100644 --- a/crates/sync/src/lib.rs +++ b/crates/sync/src/lib.rs @@ -5,3 +5,160 @@ mod model_traits; pub use crdt::*; pub use factory::*; pub use model_traits::*; + +// fn compare_messages(&self, operations: Vec) -> Vec<(CRDTOperation, bool)> { +// operations +// .into_iter() +// .map(|op| (op.id, op)) +// .collect::>() +// .into_iter() +// .filter_map(|(_, op)| { +// match &op.typ { +// CRDTOperationType::Owned(_) => { +// self._operations.iter().find(|find_op| match &find_op.typ { +// CRDTOperationType::Owned(_) => { +// find_op.timestamp >= op.timestamp && find_op.node == op.node +// } +// _ => false, +// }) +// } +// CRDTOperationType::Shared(shared_op) => { +// self._operations.iter().find(|find_op| match &find_op.typ { +// CRDTOperationType::Shared(find_shared_op) => { +// shared_op.model == find_shared_op.model +// && shared_op.record_id == find_shared_op.record_id +// && find_op.timestamp >= op.timestamp +// } +// _ => false, +// }) +// } +// CRDTOperationType::Relation(relation_op) => { +// self._operations.iter().find(|find_op| match &find_op.typ { +// CRDTOperationType::Relation(find_relation_op) => { +// relation_op.relation == find_relation_op.relation +// && relation_op.relation_item == find_relation_op.relation_item +// && relation_op.relation_group == find_relation_op.relation_group +// } +// _ => false, +// }) +// } +// } +// .map(|old_op| (old_op.timestamp != op.timestamp).then_some(true)) +// .unwrap_or(Some(false)) +// .map(|old| (op, old)) +// }) +// .collect() +// } + +// pub fn receive_crdt_operations(&mut self, ops: Vec) { +// for op in &ops { +// self._clock +// .update_with_timestamp(&Timestamp::new(op.timestamp, op.node.into())) +// .ok(); + +// self._clocks.insert(op.node, op.timestamp); +// } + +// for (op, old) in self.compare_messages(ops) { +// let push_op = op.clone(); + +// if !old { +// match op.typ { +// CRDTOperationType::Shared(shared_op) => match shared_op.model.as_str() { +// "Object" => { +// let id = shared_op.record_id; + +// match shared_op.data { +// SharedOperationData::Create(SharedOperationCreateData::Atomic) => { +// self.objects.insert( +// id, +// Object { +// id, +// ..Default::default() +// }, +// ); +// } +// SharedOperationData::Update { field, value } => { +// let mut file = self.objects.get_mut(&id).unwrap(); + +// match field.as_str() { +// "name" => { +// file.name = from_value(value).unwrap(); +// } +// _ => unreachable!(), +// } +// } +// SharedOperationData::Delete => { +// self.objects.remove(&id).unwrap(); +// } +// _ => {} +// } +// } +// _ => unreachable!(), +// }, +// CRDTOperationType::Owned(owned_op) => match owned_op.model.as_str() { +// "FilePath" => { +// for item in owned_op.items { +// let id = from_value(item.id).unwrap(); + +// match item.data { +// OwnedOperationData::Create(data) => { +// self.file_paths.insert( +// id, +// from_value(Value::Object(data.into_iter().collect())) +// .unwrap(), +// ); +// } +// OwnedOperationData::Update(data) => { +// let obj = self.file_paths.get_mut(&id).unwrap(); + +// for (key, value) in data { +// match key.as_str() { +// "path" => obj.path = from_value(value).unwrap(), +// "file" => obj.file = from_value(value).unwrap(), +// _ => unreachable!(), +// } +// } +// } +// OwnedOperationData::Delete => { +// self.file_paths.remove(&id); +// } +// } +// } +// } +// _ => unreachable!(), +// }, +// CRDTOperationType::Relation(relation_op) => match relation_op.relation.as_str() +// { +// "TagOnObject" => match relation_op.data { +// RelationOperationData::Create => { +// self.tags_on_objects.insert( +// (relation_op.relation_item, relation_op.relation_group), +// TagOnObject { +// object_id: relation_op.relation_item, +// tag_id: relation_op.relation_group, +// }, +// ); +// } +// RelationOperationData::Update { field: _, value: _ } => { +// // match field.as_str() { +// // _ => unreachable!(), +// // } +// } +// RelationOperationData::Delete => { +// self.tags_on_objects +// .remove(&( +// relation_op.relation_item, +// relation_op.relation_group, +// )) +// .unwrap(); +// } +// }, +// _ => unreachable!(), +// }, +// } + +// self._operations.push(push_op) +// } +// } +// } diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml new file mode 100644 index 000000000..825c16497 --- /dev/null +++ b/crates/utils/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "sd-utils" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +uuid = { workspace = true } diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs new file mode 100644 index 000000000..854304f5e --- /dev/null +++ b/crates/utils/src/lib.rs @@ -0,0 +1,23 @@ +use uuid::Uuid; + +/// Combines an iterator of `T` and an iterator of `Option`, +/// removing any `None` values in the process +pub fn chain_optional_iter( + required: impl IntoIterator, + optional: impl IntoIterator>, +) -> Vec { + required + .into_iter() + .map(Some) + .chain(optional) + .flatten() + .collect() +} + +pub fn uuid_to_bytes(uuid: Uuid) -> Vec { + uuid.as_bytes().to_vec() +} + +pub fn from_bytes_to_uuid(bytes: &[u8]) -> Uuid { + Uuid::from_slice(bytes).expect("corrupted uuid in database") +} diff --git a/interface/app/$libraryId/Layout/Sidebar/Contents.tsx b/interface/app/$libraryId/Layout/Sidebar/Contents.tsx index 15e632ebd..5612f820d 100644 --- a/interface/app/$libraryId/Layout/Sidebar/Contents.tsx +++ b/interface/app/$libraryId/Layout/Sidebar/Contents.tsx @@ -8,7 +8,7 @@ import { FilmStrip, Planet } from 'phosphor-react'; -import { LibraryContextProvider, useClientContext } from '@sd/client'; +import { LibraryContextProvider, useClientContext, useFeatureFlag } from '@sd/client'; import { SubtleButton } from '~/components/SubtleButton'; import Icon from './Icon'; import { LibrarySection } from './LibrarySection'; @@ -33,6 +33,12 @@ export default () => { Imports */} + {useFeatureFlag('syncRoute') && ( + + + Sync + + )} {library && ( diff --git a/interface/app/$libraryId/sync.tsx b/interface/app/$libraryId/sync.tsx index f3ab60987..b8916fec5 100644 --- a/interface/app/$libraryId/sync.tsx +++ b/interface/app/$libraryId/sync.tsx @@ -1,39 +1,27 @@ -import { CRDTOperation, useLibraryQuery, useLibrarySubscription } from '@sd/client'; -import { tw } from '@sd/ui'; +import { useMemo } from 'react'; +import { stringify } from 'uuid'; +import { + CRDTOperation, + RelationOperation, + SharedOperation, + useLibraryQuery, + useLibrarySubscription +} from '@sd/client'; -const Label = tw.span`text-gray-300`; -const Pill = tw.span`rounded-full bg-gray-500 px-2 py-1`; -const Row = tw.p`overflow-hidden text-ellipsis space-x-1`; - -const OperationItem = ({ op }: { op: CRDTOperation }) => { - let contents = null; - - if ('model' in op.typ) { - let subContents = null; - - if (op.typ.data === 'd') subContents = 'Delete'; - else if (op.typ.data === 'c') subContents = 'Create'; - else subContents = `Update - ${op.typ.data.u.field}`; - - contents = ( - <> -
- {subContents} -
- - - {op.typ.model} - - - - {op.timestamp} - - - ); - } - - return
  • {contents}
  • ; -}; +type MessageGroup = + | { + variant: 'Shared'; + model: string; + id: string; + messages: { op: SharedOperation; timestamp: number }[]; + } + | { + variant: 'Relation'; + relation: string; + item_id: string; + group_id: string; + messages: { op: RelationOperation; timestamp: number }[]; + }; export const Component = () => { const messages = useLibraryQuery(['sync.messages']); @@ -42,11 +30,156 @@ export const Component = () => { onData: () => messages.refetch() }); + const groups = useMemo(() => { + if (messages.data) return calculateGroups(messages.data); + }, [messages]); + return ( -
      - {messages.data?.map((op) => ( - +
        + {groups?.map((group, index) => ( + ))}
      ); }; + +const OperationGroup: React.FC<{ group: MessageGroup }> = ({ group }) => { + const [header, contents] = (() => { + switch (group.variant) { + case 'Shared': { + const header = ( +
      + {group.model} + {group.id} +
      + ); + const contents = ( +
        + {group.messages.map((message, index) => ( +
      • + {typeof message.op.data === 'string' ? ( +

        {message.op.data === 'c' ? 'Create' : 'Delete'}

        + ) : ( +

        Update - {message.op.data.u.field}

        + )} +

        {message.timestamp}

        +
      • + ))} +
      + ); + return [header, contents]; + } + case 'Relation': { + const header = ( +
      + {group.relation} + {group.item_id} + in + {group.group_id} +
      + ); + + const contents = ( +
        + {group.messages.map((message, index) => ( +
      • + {typeof message.op.data === 'string' ? ( +

        {message.op.data === 'c' ? 'Create' : 'Delete'}

        + ) : ( +

        Update - {message.op.data.u.field}

        + )} +

        {message.timestamp}

        +
      • + ))} +
      + ); + + return [header, contents]; + } + } + })(); + + return ( +
      + {header} + {contents} +
      + ); +}; + +function calculateGroups(messages: CRDTOperation[]) { + return messages.reduce((acc, curr) => { + const { typ } = curr; + + if ('model' in typ) { + const id = stringify(typ.record_id.pub_id); + + const latest = (() => { + const latest = acc[acc.length - 1]; + + if ( + !latest || + latest.variant !== 'Shared' || + latest.model !== typ.model || + latest.id !== id + ) { + const group: MessageGroup = { + variant: 'Shared', + model: typ.model, + id, + messages: [] + }; + + acc.push(group); + + return group; + } else { + return latest; + } + })(); + + latest.messages.push({ + op: typ, + timestamp: curr.timestamp + }); + } else { + const id = { + item: stringify(typ.relation_item.pub_id), + group: stringify(typ.relation_group.pub_id) + }; + + const latest = (() => { + const latest = acc[acc.length - 1]; + + if ( + !latest || + latest.variant !== 'Relation' || + latest.relation !== typ.relation || + latest.item_id !== id.item || + latest.group_id !== id.group + ) { + const group: MessageGroup = { + variant: 'Relation', + relation: typ.relation, + item_id: id.item, + group_id: id.group, + messages: [] + }; + + acc.push(group); + + return group; + } else { + return latest; + } + })(); + + latest.messages.push({ + op: typ, + timestamp: curr.timestamp + }); + } + + return acc; + }, []); +} diff --git a/interface/app/onboarding/creating-library.tsx b/interface/app/onboarding/creating-library.tsx index 4159ae121..5295f7b1c 100644 --- a/interface/app/onboarding/creating-library.tsx +++ b/interface/app/onboarding/creating-library.tsx @@ -51,11 +51,10 @@ export default function OnboardingCreatingLibrary() { // it feels more fitting to configure it here (once) telemetryStore.shareTelemetry = obStore.shareTelemetry; + console.log('creating'); createLibrary.mutate({ name: obStore.newLibraryName }); - - return; }; const created = useRef(false); diff --git a/packages/client/src/core.ts b/packages/client/src/core.ts index f4b80f72c..a624b9933 100644 --- a/packages/client/src/core.ts +++ b/packages/client/src/core.ts @@ -261,7 +261,7 @@ export type PairingStatus = { type: "EstablishingConnection" } | { type: "Pairin export type PeerId = string -export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; version: string | null; email: string | null; img_url: string | null; instances: string[] } +export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; version: string | null; email: string | null; img_url: string | null } export type RelationOperation = { relation_item: any; relation_group: any; relation: string; data: RelationOperationData } diff --git a/packages/client/src/hooks/useFeatureFlag.tsx b/packages/client/src/hooks/useFeatureFlag.tsx index ec48c100c..e4a943f62 100644 --- a/packages/client/src/hooks/useFeatureFlag.tsx +++ b/packages/client/src/hooks/useFeatureFlag.tsx @@ -2,9 +2,9 @@ import { useEffect } from 'react'; import { subscribe, useSnapshot } from 'valtio'; import { valtioPersist } from '../lib/valito'; -export const features = ['spacedrop', 'p2pPairing'] as const; +export const features = ['spacedrop', 'p2pPairing', 'syncRoute'] as const; -export type FeatureFlag = (typeof features)[number]; +export type FeatureFlag = typeof features[number]; const featureFlagState = valtioPersist('sd-featureFlags', { enabled: [] as FeatureFlag[] diff --git a/packages/client/src/hooks/useNotifications.tsx b/packages/client/src/hooks/useNotifications.tsx index 252b62ae5..4eb49aff1 100644 --- a/packages/client/src/hooks/useNotifications.tsx +++ b/packages/client/src/hooks/useNotifications.tsx @@ -1,4 +1,4 @@ -import { PropsWithChildren, createContext, useContext, useState } from 'react'; +import { PropsWithChildren, createContext, useState } from 'react'; import { Notification } from '../core'; import { useBridgeSubscription } from '../rspc';