CRDTOperation receiving (#1122)

* operation receive + compare

* cleanup + deduplication

* operation receive + compare

* cleanup + deduplication

* sync route + operation grouping

* tag assign sync

* proper relation support in sync debug page

* migration

* separate core-sync + utils crates

* separate p2p event loop from manager

* cleanup library handling

* clippy

* feature gate sync messages properly

* make migration not add required field
This commit is contained in:
Brendan Allan 2023-07-24 23:26:00 +08:00 committed by GitHub
parent 23b0c403be
commit 043b607ad4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
51 changed files with 1238 additions and 642 deletions

25
Cargo.lock generated
View file

@ -7102,6 +7102,7 @@ dependencies = [
"rmp-serde",
"rmpv",
"rspc",
"sd-core-sync",
"sd-crypto",
"sd-ffmpeg",
"sd-file-ext",
@ -7109,6 +7110,7 @@ dependencies = [
"sd-p2p",
"sd-prisma",
"sd-sync",
"sd-utils",
"serde",
"serde-hashkey",
"serde_json",
@ -7126,12 +7128,26 @@ dependencies = [
"tracing-appender",
"tracing-subscriber 0.3.0",
"tracing-test",
"uhlc",
"uuid",
"webp",
"winapi-util",
]
[[package]]
name = "sd-core-sync"
version = "0.1.0"
dependencies = [
"prisma-client-rust",
"sd-prisma",
"sd-sync",
"sd-utils",
"serde",
"serde_json",
"tokio",
"uhlc",
"uuid",
]
[[package]]
name = "sd-crypto"
version = "0.0.0"
@ -7328,6 +7344,13 @@ dependencies = [
"thiserror",
]
[[package]]
name = "sd-utils"
version = "0.1.0"
dependencies = [
"uuid",
]
[[package]]
name = "sdp"
version = "0.5.3"

View file

@ -2,6 +2,7 @@
resolver = "2"
members = [
"core",
"core/crates/*",
"crates/*",
# "crates/p2p/tunnel",
# "crates/p2p/tunnel/utils",
@ -42,6 +43,9 @@ tauri-specta = { version = "1.0.2" }
swift-rs = { version = "1.0.5" }
tokio = { version = "1.28.2" }
uuid = { version = "1.3.3", features = ["v4", "serde"] }
serde = { version = "1.0" }
serde_json = { version = "1.0" }
[patch.crates-io]
# We use this patch so we can compile for the IOS simulator on M1

View file

@ -50,8 +50,6 @@ const CreatingLibraryScreen = ({ navigation }: OnboardingStackScreenProps<'Creat
const create = async () => {
telemetryStore.shareTelemetry = obStore.shareTelemetry;
createLibrary.mutate({ name: obStore.newLibraryName });
return;
};
useEffect(() => {

View file

@ -15,7 +15,6 @@ mobile = []
# This feature controls whether the Spacedrive Core contains functionality which requires FFmpeg.
ffmpeg = ["dep:sd-ffmpeg"]
location-watcher = ["dep:notify"]
sync-messages = []
heif = ["dep:sd-heif"]
[dependencies]
@ -31,6 +30,9 @@ sd-file-ext = { path = "../crates/file-ext" }
sd-sync = { path = "../crates/sync" }
sd-p2p = { path = "../crates/p2p", features = ["specta", "serde"] }
sd-prisma = { path = "../crates/prisma" }
sd-utils = { path = "../crates/utils" }
sd-core-sync = { path = "./crates/sync" }
rspc = { workspace = true, features = [
"uuid",
@ -54,14 +56,14 @@ tokio = { workspace = true, features = [
base64 = "0.21.2"
serde = { version = "1.0", features = ["derive"] }
chrono = { version = "0.4.25", features = ["serde"] }
serde_json = "1.0"
serde_json = { workspace = true }
futures = "0.3"
rmp = "^0.8.11"
rmp-serde = "^1.1.1"
rmpv = "^1.0.0"
blake3 = "1.3.3"
hostname = "0.3.1"
uuid = { version = "1.3.3", features = ["v4", "serde"] }
uuid = { workspace = true }
sysinfo = "0.28.4"
thiserror = "1.0.40"
include_dir = { version = "0.7.3", features = ["glob"] }
@ -78,7 +80,6 @@ ctor = "0.1.26"
globset = { version = "^0.4.10", features = ["serde1"] }
itertools = "^0.10.5"
enumflags2 = "0.7.7"
uhlc = "0.5.2"
http-range = "0.1.5"
mini-moka = "0.10.0"
serde_with = "2.3.3"

View file

@ -0,0 +1,21 @@
[package]
name = "sd-core-sync"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = []
emit-messages = []
[dependencies]
sd-prisma = { path = "../../../crates/prisma" }
sd-sync = { path = "../../../crates/sync" }
sd-utils = { path = "../../../crates/utils" }
prisma-client-rust = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true }
uhlc = "0.5.2"

357
core/crates/sync/src/lib.rs Normal file
View file

@ -0,0 +1,357 @@
#![allow(clippy::unwrap_used, clippy::panic)] // TODO: Brendan remove this once you've got error handling here
use sd_prisma::{prisma::*, prisma_sync::ModelSyncData};
use sd_sync::*;
use sd_utils::uuid_to_bytes;
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};
use serde_json::to_vec;
use tokio::sync::broadcast::{self, Receiver, Sender};
use uhlc::{HLCBuilder, Timestamp, HLC, NTP64};
use uuid::Uuid;
pub use sd_prisma::prisma_sync;
#[derive(Clone)]
pub enum SyncMessage {
Ingested(CRDTOperation),
Created(CRDTOperation),
}
pub struct SyncManager {
db: Arc<PrismaClient>,
instance: Uuid,
_clocks: HashMap<Uuid, NTP64>,
clock: HLC,
pub tx: Sender<SyncMessage>,
}
impl SyncManager {
pub fn new(db: &Arc<PrismaClient>, instance: Uuid) -> (Self, Receiver<SyncMessage>) {
let (tx, rx) = broadcast::channel(64);
(
Self {
db: db.clone(),
instance,
clock: HLCBuilder::new().with_id(instance.into()).build(),
_clocks: Default::default(),
tx,
},
rx,
)
}
pub async fn write_ops<'item, I: prisma_client_rust::BatchItem<'item>>(
&self,
tx: &PrismaClient,
(_ops, queries): (Vec<CRDTOperation>, I),
) -> prisma_client_rust::Result<<I as prisma_client_rust::BatchItemParent>::ReturnValue> {
#[cfg(feature = "emit-messages")]
let res = {
macro_rules! variant {
($var:ident, $variant:ident, $fn:ident) => {
let $var = _ops
.iter()
.filter_map(|op| match &op.typ {
CRDTOperationType::$variant(inner) => {
Some($fn(&op, &inner).to_query(tx))
}
_ => None,
})
.collect::<Vec<_>>();
};
}
variant!(shared, Shared, shared_op_db);
variant!(relation, Relation, relation_op_db);
let (res, _) = tx._batch((queries, (shared, relation))).await?;
for op in _ops {
self.tx.send(SyncMessage::Created(op)).ok();
}
res
};
#[cfg(not(feature = "emit-messages"))]
let res = tx._batch([queries]).await?.remove(0);
Ok(res)
}
#[allow(unused_variables)]
pub async fn write_op<'item, Q: prisma_client_rust::BatchItem<'item>>(
&self,
tx: &PrismaClient,
op: CRDTOperation,
query: Q,
) -> prisma_client_rust::Result<<Q as prisma_client_rust::BatchItemParent>::ReturnValue> {
#[cfg(feature = "emit-messages")]
let ret = {
macro_rules! exec {
($fn:ident, $inner:ident) => {
tx._batch(($fn(&op, $inner).to_query(tx), query)).await?.1
};
}
let ret = match &op.typ {
CRDTOperationType::Shared(inner) => exec!(shared_op_db, inner),
CRDTOperationType::Relation(inner) => exec!(relation_op_db, inner),
};
self.tx.send(SyncMessage::Created(op)).ok();
ret
};
#[cfg(not(feature = "emit-messages"))]
let ret = tx._batch(vec![query]).await?.remove(0);
Ok(ret)
}
pub async fn get_ops(&self) -> prisma_client_rust::Result<Vec<CRDTOperation>> {
let Self { db, .. } = self;
shared_operation::include!(shared_include {
instance: select { pub_id }
});
relation_operation::include!(relation_include {
instance: select { pub_id }
});
enum DbOperation {
Shared(shared_include::Data),
Relation(relation_include::Data),
}
impl DbOperation {
fn timestamp(&self) -> NTP64 {
NTP64(match self {
Self::Shared(op) => op.timestamp,
Self::Relation(op) => op.timestamp,
} as u64)
}
fn id(&self) -> Uuid {
Uuid::from_slice(match self {
Self::Shared(op) => &op.id,
Self::Relation(op) => &op.id,
})
.unwrap()
}
fn instance(&self) -> Uuid {
Uuid::from_slice(match self {
Self::Shared(op) => &op.instance.pub_id,
Self::Relation(op) => &op.instance.pub_id,
})
.unwrap()
}
fn into_operation(self) -> CRDTOperation {
CRDTOperation {
id: self.id(),
instance: self.instance(),
timestamp: self.timestamp(),
typ: match self {
Self::Shared(op) => CRDTOperationType::Shared(SharedOperation {
record_id: serde_json::from_slice(&op.record_id).unwrap(),
model: op.model,
data: serde_json::from_slice(&op.data).unwrap(),
}),
Self::Relation(op) => CRDTOperationType::Relation(RelationOperation {
relation: op.relation,
data: serde_json::from_slice(&op.data).unwrap(),
relation_item: serde_json::from_slice(&op.item_id).unwrap(),
relation_group: serde_json::from_slice(&op.group_id).unwrap(),
}),
},
}
}
}
let (shared, relation) = db
._batch((
db.shared_operation()
.find_many(vec![])
.include(shared_include::include()),
db.relation_operation()
.find_many(vec![])
.include(relation_include::include()),
))
.await?;
let mut ops = BTreeMap::new();
ops.extend(
shared
.into_iter()
.map(DbOperation::Shared)
.map(|op| (op.timestamp(), op)),
);
ops.extend(
relation
.into_iter()
.map(DbOperation::Relation)
.map(|op| (op.timestamp(), op)),
);
Ok(ops
.into_values()
.map(DbOperation::into_operation)
.rev()
.collect())
}
pub async fn apply_op(&self, op: CRDTOperation) -> prisma_client_rust::Result<()> {
ModelSyncData::from_op(op.typ.clone())
.unwrap()
.exec(&self.db)
.await?;
match &op.typ {
CRDTOperationType::Shared(shared_op) => {
shared_op_db(&op, shared_op)
.to_query(&self.db)
.exec()
.await?;
}
CRDTOperationType::Relation(relation_op) => {
relation_op_db(&op, relation_op)
.to_query(&self.db)
.exec()
.await?;
}
}
self.tx.send(SyncMessage::Ingested(op.clone())).ok();
Ok(())
}
async fn compare_message(&self, op: &CRDTOperation) -> bool {
let old_timestamp = match &op.typ {
CRDTOperationType::Shared(shared_op) => {
let newer_op = self
.db
.shared_operation()
.find_first(vec![
shared_operation::timestamp::gte(op.timestamp.as_u64() as i64),
shared_operation::model::equals(shared_op.model.to_string()),
shared_operation::record_id::equals(
serde_json::to_vec(&shared_op.record_id).unwrap(),
),
shared_operation::kind::equals(shared_op.kind().to_string()),
])
.order_by(shared_operation::timestamp::order(SortOrder::Desc))
.exec()
.await
.unwrap();
newer_op.map(|newer_op| newer_op.timestamp)
}
CRDTOperationType::Relation(relation_op) => {
let newer_op = self
.db
.relation_operation()
.find_first(vec![
relation_operation::timestamp::gte(op.timestamp.as_u64() as i64),
relation_operation::relation::equals(relation_op.relation.to_string()),
relation_operation::item_id::equals(
serde_json::to_vec(&relation_op.relation_item).unwrap(),
),
relation_operation::kind::equals(relation_op.kind().to_string()),
])
.order_by(relation_operation::timestamp::order(SortOrder::Desc))
.exec()
.await
.unwrap();
newer_op.map(|newer_op| newer_op.timestamp)
}
};
old_timestamp
.map(|old| old != op.timestamp.as_u64() as i64)
.unwrap_or_default()
}
pub async fn receive_crdt_operation(&mut self, op: CRDTOperation) {
self.clock
.update_with_timestamp(&Timestamp::new(op.timestamp, op.instance.into()))
.ok();
let timestamp = self
._clocks
.entry(op.instance)
.or_insert_with(|| op.timestamp);
if *timestamp < op.timestamp {
*timestamp = op.timestamp;
}
let op_timestamp = op.timestamp;
let op_instance = op.instance;
let is_old = self.compare_message(&op).await;
if !is_old {
self.apply_op(op).await.ok();
}
self.db
.instance()
.update(
instance::pub_id::equals(uuid_to_bytes(op_instance)),
vec![instance::timestamp::set(Some(op_timestamp.as_u64() as i64))],
)
.exec()
.await
.ok();
}
}
fn shared_op_db(op: &CRDTOperation, shared_op: &SharedOperation) -> shared_operation::Create {
shared_operation::Create {
id: op.id.as_bytes().to_vec(),
timestamp: op.timestamp.0 as i64,
instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()),
kind: shared_op.kind().to_string(),
data: to_vec(&shared_op.data).unwrap(),
model: shared_op.model.to_string(),
record_id: to_vec(&shared_op.record_id).unwrap(),
_params: vec![],
}
}
fn relation_op_db(
op: &CRDTOperation,
relation_op: &RelationOperation,
) -> relation_operation::Create {
relation_operation::Create {
id: op.id.as_bytes().to_vec(),
timestamp: op.timestamp.0 as i64,
instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()),
kind: relation_op.kind().to_string(),
data: to_vec(&relation_op.data).unwrap(),
relation: relation_op.relation.to_string(),
item_id: to_vec(&relation_op.relation_item).unwrap(),
group_id: to_vec(&relation_op.relation_group).unwrap(),
_params: vec![],
}
}
impl OperationFactory for SyncManager {
fn get_clock(&self) -> &HLC {
&self.clock
}
fn get_instance(&self) -> Uuid {
self.instance
}
}

View file

@ -0,0 +1,15 @@
-- AlterTable
ALTER TABLE "instance" ADD COLUMN "timestamp" BIGINT;
-- CreateTable
CREATE TABLE "relation_operation" (
"id" BLOB NOT NULL PRIMARY KEY,
"timestamp" BIGINT NOT NULL,
"relation" TEXT NOT NULL,
"item_id" BLOB NOT NULL,
"group_id" BLOB NOT NULL,
"kind" TEXT NOT NULL,
"data" BLOB NOT NULL,
"instance_id" INTEGER NOT NULL,
CONSTRAINT "relation_operation_instance_id_fkey" FOREIGN KEY ("instance_id") REFERENCES "instance" ("id") ON DELETE RESTRICT ON UPDATE CASCADE
);

View file

@ -36,6 +36,23 @@ model SharedOperation {
@@map("shared_operation")
}
model RelationOperation {
id Bytes @id
timestamp BigInt
relation String
item_id Bytes
group_id Bytes
kind String
data Bytes
instance_id Int
instance Instance @relation(fields: [instance_id], references: [id])
@@map("relation_operation")
}
/// @deprecated: This model has to exist solely for backwards compatibility.
model Node {
id Int @id @default(autoincrement())
@ -67,9 +84,13 @@ model Instance {
last_seen DateTime // Time core started for owner, last P2P message for P2P node
date_created DateTime
// clock timestamp for sync
timestamp BigInt?
// attestation Bytes
SharedOperation SharedOperation[]
RelationOperation RelationOperation[]
@@map("instance")
}

View file

@ -94,6 +94,8 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.create(args.name, None, ctx.config.get().await)
.await?;
debug!("Created library {}", new_library.uuid);
Ok(new_library)
})
})

View file

@ -80,7 +80,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
ctx.p2p
.pairing
.clone()
.originator(id, ctx.config.get().await)
.originator(id, ctx.config.get().await, ctx.library_manager.clone())
.await
})
})

View file

@ -10,7 +10,6 @@ use crate::{
},
object::preview::get_thumb_key,
prisma::{self, file_path, location, object, tag, tag_on_object, PrismaClient},
util::db::chain_optional_iter,
};
use std::collections::BTreeSet;
@ -159,7 +158,7 @@ impl FilePathFilterArgs {
{
use file_path::*;
Ok(chain_optional_iter(
Ok(sd_utils::chain_optional_iter(
self.search
.unwrap_or_default()
.split(' ')
@ -248,7 +247,7 @@ impl ObjectFilterArgs {
fn into_params(self) -> Vec<object::WhereParam> {
use object::*;
chain_optional_iter(
sd_utils::chain_optional_iter(
[],
[
self.hidden.to_param(),

View file

@ -1,6 +1,5 @@
use rspc::alpha::AlphaRouter;
use crate::sync::SyncMessage;
use sd_core_sync::SyncMessage;
use super::{utils::library, Ctx, R};

View file

@ -1,5 +1,7 @@
use chrono::Utc;
use rspc::{alpha::AlphaRouter, ErrorCode};
use sd_prisma::prisma_sync;
use sd_sync::OperationFactory;
use serde::Deserialize;
use specta::Type;
@ -9,8 +11,7 @@ use crate::{
invalidate_query,
library::Library,
object::tag::TagCreateArgs,
prisma::{tag, tag_on_object},
sync::{self, OperationFactory},
prisma::{object, tag, tag_on_object},
};
use super::{utils::library, Ctx, R};
@ -66,29 +67,72 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
R.with2(library())
.mutation(|(_, library), args: TagAssignArgs| async move {
let Library { db, .. } = library.as_ref();
let Library { db, sync, .. } = library.as_ref();
let (tag, objects) = db
._batch((
db.tag()
.find_unique(tag::id::equals(args.tag_id))
.select(tag::select!({ pub_id })),
db.object()
.find_many(vec![object::id::in_vec(args.object_ids)])
.select(object::select!({ id pub_id })),
))
.await?;
let tag = tag.ok_or_else(|| {
rspc::Error::new(ErrorCode::NotFound, "Tag not found".to_string())
})?;
macro_rules! sync_id {
($object:ident) => {
prisma_sync::tag_on_object::SyncId {
tag: prisma_sync::tag::SyncId {
pub_id: tag.pub_id.clone(),
},
object: prisma_sync::object::SyncId {
pub_id: $object.pub_id.clone(),
},
}
};
}
if args.unassign {
db.tag_on_object()
.delete_many(vec![
tag_on_object::tag_id::equals(args.tag_id),
tag_on_object::object_id::in_vec(args.object_ids),
])
.exec()
.await?;
} else {
db.tag_on_object()
.create_many(
args.object_ids
.iter()
.map(|&object_id| tag_on_object::CreateUnchecked {
tag_id: args.tag_id,
object_id,
_params: vec![],
})
let query = db.tag_on_object().delete_many(vec![
tag_on_object::tag_id::equals(args.tag_id),
tag_on_object::object_id::in_vec(
objects.iter().map(|o| o.id).collect(),
),
]);
sync.write_ops(
db,
(
objects
.into_iter()
.map(|object| sync.relation_delete(sync_id!(object)))
.collect(),
)
.exec()
query,
),
)
.await?;
} else {
let (sync_ops, db_creates) = objects.into_iter().fold(
(vec![], vec![]),
|(mut sync_ops, mut db_creates), object| {
db_creates.push(tag_on_object::CreateUnchecked {
tag_id: args.tag_id,
object_id: object.id,
_params: vec![],
});
sync_ops.extend(sync.relation_create(sync_id!(object), []));
(sync_ops, db_creates)
},
);
sync.write_ops(db, (sync_ops, db.tag_on_object().create_many(db_creates)))
.await?;
}
@ -139,7 +183,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.flatten()
.map(|(k, v)| {
sync.shared_update(
sync::tag::SyncId {
prisma_sync::tag::SyncId {
pub_id: tag.pub_id.clone(),
},
k,

View file

@ -1,7 +1,7 @@
use crate::{
library::Library,
prisma::job,
util::db::{chain_optional_iter, maybe_missing, MissingFieldError},
util::db::{maybe_missing, MissingFieldError},
};
use std::fmt::{Display, Formatter};
@ -203,7 +203,7 @@ impl JobReport {
.job()
.create(
self.id.as_bytes().to_vec(),
chain_optional_iter(
sd_utils::chain_optional_iter(
[
job::name::set(Some(self.name.clone())),
job::action::set(self.action.clone()),

View file

@ -39,7 +39,6 @@ pub(crate) mod node;
pub(crate) mod object;
pub(crate) mod p2p;
pub(crate) mod preferences;
pub(crate) mod sync;
pub(crate) mod util;
pub(crate) mod volume;
@ -47,6 +46,7 @@ pub struct NodeContext {
pub config: Arc<NodeConfigManager>,
pub job_manager: Arc<JobManager>,
pub location_manager: Arc<LocationManager>,
pub p2p: Arc<P2PManager>,
pub event_bus_tx: broadcast::Sender<CoreEvent>,
pub notifications: Arc<NotificationManager>,
}
@ -81,28 +81,32 @@ impl Node {
debug!("Initialised 'NodeConfigManager'...");
let job_manager = JobManager::new();
debug!("Initialised 'JobManager'...");
let notifications = NotificationManager::new();
debug!("Initialised 'NotificationManager'...");
let location_manager = LocationManager::new();
debug!("Initialised 'LocationManager'...");
let (p2p, p2p_stream) = P2PManager::new(config.clone()).await?;
debug!("Initialised 'P2PManager'...");
let library_manager = LibraryManager::new(
data_dir.join("libraries"),
Arc::new(NodeContext {
config: config.clone(),
job_manager: job_manager.clone(),
location_manager: location_manager.clone(),
// p2p: p2p.clone(),
p2p: p2p.clone(),
event_bus_tx: event_bus.0.clone(),
notifications: notifications.clone(),
}),
)
.await?;
debug!("Initialised 'LibraryManager'...");
let p2p = P2PManager::new(config.clone(), library_manager.clone()).await?;
debug!("Initialised 'P2PManager'...");
p2p.start(p2p_stream, library_manager.clone());
#[cfg(debug_assertions)]
if let Some(init_data) = init_data {

View file

@ -2,7 +2,7 @@ use crate::{
node::{NodeConfig, Platform},
prisma::{file_path, indexer_rule, PrismaClient},
util::{
db::{maybe_missing, uuid_to_bytes},
db::maybe_missing,
migrator::{Migrate, MigratorError},
},
};
@ -65,9 +65,9 @@ impl Migrate for LibraryConfig {
.map(|(i, name)| {
db.indexer_rule().update_many(
vec![indexer_rule::name::equals(Some(name))],
vec![indexer_rule::pub_id::set(uuid_to_bytes(Uuid::from_u128(
i as u128,
)))],
vec![indexer_rule::pub_id::set(sd_utils::uuid_to_bytes(
Uuid::from_u128(i as u128),
))],
)
})
.collect::<Vec<_>>(),
@ -185,6 +185,7 @@ impl Migrate for LibraryConfig {
node_platform: Platform::current() as i32,
last_seen: now,
date_created: node.map(|n| n.date_created).unwrap_or_else(|| now),
// timestamp: Default::default(), // TODO: Source this properly!
_params: vec![],
}
.to_query(db)

View file

@ -9,11 +9,11 @@ use crate::{
},
node::NodeConfigManager,
object::{
orphan_remover::OrphanRemoverActor, preview::get_thumbnail_path,
orphan_remover::OrphanRemoverActor,
preview::{get_thumbnail_path, THUMBNAIL_CACHE_DIR_NAME},
thumbnail_remover::ThumbnailRemoverActor,
},
prisma::{file_path, location, PrismaClient},
sync::SyncManager,
util::{db::maybe_missing, error::FileIOError},
NodeContext,
};
@ -26,13 +26,14 @@ use std::{
};
use chrono::{DateTime, Utc};
use sd_core_sync::{SyncManager, SyncMessage};
use sd_p2p::spacetunnel::Identity;
use sd_prisma::prisma::notification;
use tokio::{fs, io};
use tracing::warn;
use uuid::Uuid;
use super::{LibraryConfig, LibraryManagerError};
use super::{LibraryConfig, LibraryManager, LibraryManagerError};
/// LibraryContext holds context for a library which can be passed around the application.
pub struct Library {
@ -42,7 +43,7 @@ pub struct Library {
pub config: LibraryConfig,
/// db holds the database client for the current library.
pub db: Arc<PrismaClient>,
pub sync: Arc<SyncManager>,
pub sync: Arc<sd_core_sync::SyncManager>,
/// key manager that provides encryption keys to functions that require them
// pub key_manager: Arc<KeyManager>,
/// node_context holds the node context for the node which this library is running on.
@ -66,6 +67,53 @@ impl Debug for Library {
}
impl Library {
pub fn new(
id: Uuid,
instance_id: Uuid,
config: LibraryConfig,
identity: Arc<Identity>,
db: Arc<PrismaClient>,
library_manager: Arc<LibraryManager>,
// node_context: Arc<NodeContext>,
) -> Self {
let (sync_manager, mut sync_rx) = SyncManager::new(&db, instance_id);
let node_context = library_manager.node_context.clone();
let library = Self {
orphan_remover: OrphanRemoverActor::spawn(db.clone()),
thumbnail_remover: ThumbnailRemoverActor::spawn(
db.clone(),
node_context
.config
.data_directory()
.join(THUMBNAIL_CACHE_DIR_NAME),
),
id,
db,
config,
node_context,
// key_manager,
sync: Arc::new(sync_manager),
identity: identity.clone(),
};
tokio::spawn({
async move {
while let Ok(op) = sync_rx.recv().await {
let SyncMessage::Created(op) = op else { continue; };
library_manager
.node_context
.p2p
.broadcast_sync_events(id, &identity, vec![op], &library_manager)
.await;
}
}
});
library
}
pub(crate) fn emit(&self, event: CoreEvent) {
if let Err(e) = self.node_context.event_bus_tx.send(event) {
warn!("Error sending event to event bus: {e:?}");

View file

@ -2,12 +2,8 @@ use crate::{
invalidate_query,
location::{indexer, LocationManagerError},
node::{NodeConfig, Platform},
object::{
orphan_remover::OrphanRemoverActor, preview::THUMBNAIL_CACHE_DIR_NAME, tag,
thumbnail_remover::ThumbnailRemoverActor,
},
object::tag,
prisma::location,
sync::{SyncManager, SyncMessage},
util::{
db::{self, MissingFieldError},
error::{FileIOError, NonUtf8PathError},
@ -27,33 +23,12 @@ use chrono::Utc;
use sd_p2p::spacetunnel::{Identity, IdentityErr};
use sd_prisma::prisma::instance;
use thiserror::Error;
use tokio::{
fs, io,
sync::{broadcast, RwLock},
try_join,
};
use tokio::{fs, io, sync::RwLock, try_join};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use super::{Library, LibraryConfig, LibraryConfigWrapped, LibraryName};
pub enum SubscriberEvent {
Load(Uuid, Arc<Identity>, broadcast::Receiver<SyncMessage>),
}
impl Clone for SubscriberEvent {
fn clone(&self) -> Self {
match self {
Self::Load(id, identity, receiver) => {
Self::Load(*id, identity.clone(), receiver.resubscribe())
}
}
}
}
pub trait SubscriberFn: Fn(SubscriberEvent) + Send + Sync + 'static {}
impl<F: Fn(SubscriberEvent) + Send + Sync + 'static> SubscriberFn for F {}
/// LibraryManager is a singleton that manages all libraries for a node.
pub struct LibraryManager {
/// libraries_dir holds the path to the directory where libraries are stored.
@ -61,9 +36,7 @@ pub struct LibraryManager {
/// libraries holds the list of libraries which are currently loaded into the node.
libraries: RwLock<Vec<Arc<Library>>>,
/// node_context holds the context for the node which this library manager is running on.
node_context: Arc<NodeContext>,
/// on load subscribers
subscribers: RwLock<Vec<Box<dyn SubscriberFn>>>,
pub node_context: Arc<NodeContext>,
}
#[derive(Error, Debug)]
@ -121,12 +94,16 @@ impl LibraryManager {
.await
.map_err(|e| FileIOError::from((&libraries_dir, e)))?;
let mut libraries = Vec::new();
let subscribers = RwLock::new(Vec::new());
let mut read_dir = fs::read_dir(&libraries_dir)
.await
.map_err(|e| FileIOError::from((&libraries_dir, e)))?;
let this = Arc::new(Self {
libraries_dir: libraries_dir.clone(),
libraries: Default::default(),
node_context,
});
while let Some(entry) = read_dir
.next_entry()
.await
@ -164,44 +141,17 @@ impl LibraryManager {
Err(e) => return Err(FileIOError::from((db_path, e)).into()),
}
libraries.push(
Self::load(
library_id,
&db_path,
config_path,
node_context.clone(),
&subscribers,
None,
true,
)
.await?,
);
this.load(library_id, &db_path, config_path, None, true)
.await?;
}
}
Ok(Arc::new(Self {
libraries: RwLock::new(libraries),
libraries_dir,
node_context,
subscribers,
}))
}
/// subscribe to library events
pub(crate) async fn subscribe<F: SubscriberFn>(&self, f: F) {
self.subscribers.write().await.push(Box::new(f));
}
async fn emit(subscribers: &RwLock<Vec<Box<dyn SubscriberFn>>>, event: SubscriberEvent) {
let subscribers = subscribers.read().await;
for subscriber in subscribers.iter() {
subscriber(event.clone());
}
Ok(this)
}
/// create creates a new library with the given config and mounts it into the running [LibraryManager].
pub(crate) async fn create(
&self,
self: &Arc<Self>,
name: LibraryName,
description: Option<String>,
node_cfg: NodeConfig,
@ -211,7 +161,7 @@ impl LibraryManager {
}
pub(crate) async fn create_with_uuid(
&self,
self: &Arc<Self>,
id: Uuid,
name: LibraryName,
description: Option<String>,
@ -240,25 +190,25 @@ impl LibraryManager {
);
let now = Utc::now().fixed_offset();
let library = Self::load(
id,
self.libraries_dir.join(format!("{id}.db")),
config_path,
self.node_context.clone(),
&self.subscribers,
Some(instance::Create {
pub_id: Uuid::new_v4().as_bytes().to_vec(),
identity: Identity::new().to_bytes(),
node_id: node_cfg.id.as_bytes().to_vec(),
node_name: node_cfg.name.clone(),
node_platform: Platform::current() as i32,
last_seen: now,
date_created: now,
_params: vec![instance::id::set(config.instance_id)],
}),
should_seed,
)
.await?;
let library = self
.load(
id,
self.libraries_dir.join(format!("{id}.db")),
config_path,
Some(instance::Create {
pub_id: Uuid::new_v4().as_bytes().to_vec(),
identity: Identity::new().to_bytes(),
node_id: node_cfg.id.as_bytes().to_vec(),
node_name: node_cfg.name.clone(),
node_platform: Platform::current() as i32,
last_seen: now,
date_created: now,
// timestamp: Default::default(), // TODO: Source this properly!
_params: vec![instance::id::set(config.instance_id)],
}),
should_seed,
)
.await?;
debug!("Loaded library '{id:?}'");
@ -270,8 +220,6 @@ impl LibraryManager {
invalidate_query!(library, "library.list");
self.libraries.write().await.push(library);
debug!("Pushed library into manager '{id:?}'");
Ok(LibraryConfigWrapped { uuid: id, config })
@ -293,9 +241,9 @@ impl LibraryManager {
.collect()
}
pub(crate) async fn get_all_instances(&self) -> Vec<instance::Data> {
vec![] // TODO: Cache in memory
}
// pub(crate) async fn get_all_instances(&self) -> Vec<instance::Data> {
// vec![] // TODO: Cache in memory
// }
pub(crate) async fn edit(
&self,
@ -396,11 +344,10 @@ impl LibraryManager {
/// load the library from a given path
async fn load(
self: &Arc<Self>,
id: Uuid,
db_path: impl AsRef<Path>,
config_path: PathBuf,
node_context: Arc<NodeContext>,
subscribers: &RwLock<Vec<Box<dyn SubscriberFn>>>,
create: Option<instance::Create>,
should_seed: bool,
) -> Result<Arc<Library>, LibraryManagerError> {
@ -417,7 +364,7 @@ impl LibraryManager {
create.to_query(&db).exec().await?;
}
let node_config = node_context.config.get().await;
let node_config = self.node_context.config.get().await;
let config =
LibraryConfig::load_and_migrate(&config_path, &(node_config.clone(), db.clone()))
.await?;
@ -462,31 +409,15 @@ impl LibraryManager {
// let key_manager = Arc::new(KeyManager::new(vec![]).await?);
// seed_keymanager(&db, &key_manager).await?;
let (sync_manager, sync_rx) = SyncManager::new(&db, instance_id);
Self::emit(
subscribers,
SubscriberEvent::Load(id, identity.clone(), sync_rx),
)
.await;
let library = Arc::new(Library {
let library = Arc::new(Library::new(
id,
instance_id,
config,
// key_manager,
sync: Arc::new(sync_manager),
orphan_remover: OrphanRemoverActor::spawn(db.clone()),
thumbnail_remover: ThumbnailRemoverActor::spawn(
db.clone(),
node_context
.config
.data_directory()
.join(THUMBNAIL_CACHE_DIR_NAME),
),
db,
node_context,
identity,
});
// key_manager,
db,
self.clone(),
));
if should_seed {
library.orphan_remover.invoke().await;

View file

@ -182,9 +182,8 @@ pub async fn create_file_path(
metadata: FilePathMetadata,
) -> Result<file_path::Data, FilePathError> {
use crate::util::db::{device_to_db, inode_to_db};
use crate::{sync, util::db::uuid_to_bytes};
use sd_prisma::prisma;
use sd_prisma::{prisma, prisma_sync};
use sd_sync::OperationFactory;
use serde_json::json;
use uuid::Uuid;
@ -203,7 +202,7 @@ pub async fn create_file_path(
vec![
(
location::NAME,
json!(sync::location::SyncId {
json!(prisma_sync::location::SyncId {
pub_id: location.pub_id
}),
),
@ -223,14 +222,14 @@ pub async fn create_file_path(
]
};
let pub_id = uuid_to_bytes(Uuid::new_v4());
let pub_id = sd_utils::uuid_to_bytes(Uuid::new_v4());
let created_path = sync
.write_ops(
db,
(
sync.shared_create(
sync::file_path::SyncId {
prisma_sync::file_path::SyncId {
pub_id: pub_id.clone(),
},
params,

View file

@ -1,9 +1,8 @@
use crate::{
library::Library,
prisma::{file_path, location, PrismaClient},
sync,
util::{
db::{device_to_db, inode_to_db, uuid_to_bytes},
db::{device_to_db, inode_to_db},
error::FileIOError,
},
};
@ -12,6 +11,7 @@ use std::path::Path;
use chrono::Utc;
use rspc::ErrorCode;
use sd_prisma::prisma_sync;
use sd_sync::*;
use serde::{Deserialize, Serialize};
use serde_json::json;
@ -103,13 +103,13 @@ async fn execute_indexer_save_step(
use file_path::*;
let pub_id = uuid_to_bytes(entry.pub_id);
let pub_id = sd_utils::uuid_to_bytes(entry.pub_id);
let (sync_params, db_params): (Vec<_>, Vec<_>) = [
(
(
location::NAME,
json!(sync::location::SyncId {
json!(prisma_sync::location::SyncId {
pub_id: pub_id.clone()
}),
),
@ -160,8 +160,8 @@ async fn execute_indexer_save_step(
(
sync.shared_create(
sync::file_path::SyncId {
pub_id: uuid_to_bytes(entry.pub_id),
prisma_sync::file_path::SyncId {
pub_id: sd_utils::uuid_to_bytes(entry.pub_id),
},
sync_params,
),
@ -199,7 +199,7 @@ async fn execute_indexer_update_step(
use file_path::*;
let pub_id = uuid_to_bytes(entry.pub_id);
let pub_id = sd_utils::uuid_to_bytes(entry.pub_id);
let (sync_params, db_params): (Vec<_>, Vec<_>) = [
// As this file was updated while Spacedrive was offline, we mark the object_id as null
@ -243,7 +243,7 @@ async fn execute_indexer_update_step(
.into_iter()
.map(|(field, value)| {
sync.shared_update(
sync::file_path::SyncId {
prisma_sync::file_path::SyncId {
pub_id: pub_id.clone(),
},
field,

View file

@ -4,7 +4,7 @@ use crate::{
library::Library,
prisma::indexer_rule,
util::{
db::{maybe_missing, uuid_to_bytes, MissingFieldError},
db::{maybe_missing, MissingFieldError},
error::{FileIOError, NonUtf8PathError},
},
};
@ -135,7 +135,7 @@ impl IndexerRuleCreateArgs {
.db
.indexer_rule()
.create(
uuid_to_bytes(generate_pub_id()),
sd_utils::uuid_to_bytes(generate_pub_id()),
vec![
name::set(Some(self.name)),
rules_per_kind::set(Some(rules_data)),

View file

@ -1,7 +1,6 @@
use crate::{
library::Library,
location::indexer::rules::{IndexerRuleError, RulePerKind},
util::db::uuid_to_bytes,
};
use chrono::Utc;
use sd_prisma::prisma::indexer_rule;
@ -29,7 +28,7 @@ pub async fn new_or_existing_library(library: &Library) -> Result<(), SeederErro
.into_iter()
.enumerate()
{
let pub_id = uuid_to_bytes(Uuid::from_u128(i as u128));
let pub_id = sd_utils::uuid_to_bytes(Uuid::from_u128(i as u128));
let rules = rmp_serde::to_vec_named(&rule.rules).map_err(IndexerRuleError::from)?;
use indexer_rule::*;

View file

@ -5,7 +5,7 @@ use crate::{
},
prisma::file_path,
util::{
db::{device_from_db, from_bytes_to_uuid, inode_from_db},
db::{device_from_db, inode_from_db},
error::FileIOError,
},
};
@ -385,7 +385,9 @@ where
) - *date_modified
> Duration::milliseconds(1)
{
to_update.push((from_bytes_to_uuid(&file_path.pub_id), entry).into());
to_update.push(
(sd_utils::from_bytes_to_uuid(&file_path.pub_id), entry).into(),
);
}
}

View file

@ -20,7 +20,6 @@ use crate::{
validation::hash::file_checksum,
},
prisma::{file_path, location, object},
sync::{self, OperationFactory},
util::{
db::{device_from_db, device_to_db, inode_from_db, inode_to_db, maybe_missing},
error::FileIOError,
@ -47,6 +46,8 @@ use sd_file_ext::extensions::ImageExtension;
use chrono::{DateTime, Local, Utc};
use notify::Event;
use prisma_client_rust::{raw, PrismaValue};
use sd_prisma::prisma_sync;
use sd_sync::OperationFactory;
use serde_json::json;
use tokio::{fs, io::ErrorKind};
use tracing::{debug, error, trace, warn};
@ -508,7 +509,7 @@ async fn inner_update_file(
.into_iter()
.map(|(field, value)| {
sync.shared_update(
sync::file_path::SyncId {
prisma_sync::file_path::SyncId {
pub_id: file_path.pub_id.clone(),
},
field,
@ -544,7 +545,7 @@ async fn inner_update_file(
sync.write_op(
db,
sync.shared_update(
sync::object::SyncId {
prisma_sync::object::SyncId {
pub_id: object.pub_id.clone(),
},
object::kind::NAME,

View file

@ -8,8 +8,7 @@ use crate::{
preview::{shallow_thumbnailer, thumbnailer_job::ThumbnailerJobInit},
},
prisma::{file_path, indexer_rules_in_location, location, PrismaClient},
sync,
util::{db::chain_optional_iter, error::FileIOError},
util::error::FileIOError,
};
use std::{
@ -22,6 +21,7 @@ use chrono::Utc;
use futures::future::TryFutureExt;
use normpath::PathExt;
use prisma_client_rust::{operator::and, or, QueryError};
use sd_prisma::prisma_sync;
use sd_sync::*;
use serde::Deserialize;
use serde_json::json;
@ -274,7 +274,7 @@ impl LocationUpdateArgs {
.into_iter()
.map(|p| {
sync.shared_update(
sync::location::SyncId {
prisma_sync::location::SyncId {
pub_id: location.pub_id.clone(),
},
p.0,
@ -483,7 +483,7 @@ pub async fn relink_location(
sync.write_op(
db,
sync.shared_update(
sync::location::SyncId {
prisma_sync::location::SyncId {
pub_id: pub_id.clone(),
},
location::path::NAME,
@ -588,7 +588,7 @@ async fn create_location(
db,
(
sync.shared_create(
sync::location::SyncId {
prisma_sync::location::SyncId {
pub_id: location_pub_id.as_bytes().to_vec(),
},
[
@ -597,7 +597,7 @@ async fn create_location(
(location::date_created::NAME, json!(date_created)),
(
location::instance_id::NAME,
json!(sync::instance::SyncId {
json!(prisma_sync::instance::SyncId {
pub_id: vec![],
// id: library.config.instance_id,
}),
@ -697,7 +697,7 @@ pub async fn delete_directory(
) -> Result<(), QueryError> {
let Library { db, .. } = library;
let children_params = chain_optional_iter(
let children_params = sd_utils::chain_optional_iter(
[file_path::location_id::equals(Some(location_id))],
[parent_iso_file_path.and_then(|parent| {
parent

View file

@ -9,7 +9,7 @@ use crate::{
file_path_for_file_identifier, IsolatedFilePathData,
},
prisma::{file_path, location, PrismaClient, SortOrder},
util::db::{chain_optional_iter, maybe_missing},
util::db::maybe_missing,
};
use std::{
@ -238,7 +238,7 @@ fn orphan_path_filters(
file_path_id: Option<file_path::id::Type>,
maybe_sub_iso_file_path: &Option<IsolatedFilePathData<'_>>,
) -> Vec<file_path::WhereParam> {
chain_optional_iter(
sd_utils::chain_optional_iter(
[
file_path::object_id::equals(None),
file_path::is_dir::equals(Some(false)),

View file

@ -6,14 +6,13 @@ use crate::{
},
object::{cas::generate_cas_id, object_for_file_identifier},
prisma::{file_path, location, object, PrismaClient},
sync::{self, CRDTOperation, OperationFactory, SyncManager},
util::{
db::{maybe_missing, uuid_to_bytes},
error::FileIOError,
},
util::{db::maybe_missing, error::FileIOError},
};
use sd_core_sync::SyncManager;
use sd_file_ext::{extensions::Extension, kind::ObjectKind};
use sd_prisma::prisma_sync;
use sd_sync::{CRDTOperation, OperationFactory};
use std::{
collections::{HashMap, HashSet},
@ -138,14 +137,14 @@ async fn identifier_job_step(
.map(|(pub_id, (meta, _))| {
(
sync.shared_update(
sync::file_path::SyncId {
pub_id: uuid_to_bytes(*pub_id),
prisma_sync::file_path::SyncId {
pub_id: sd_utils::uuid_to_bytes(*pub_id),
},
file_path::cas_id::NAME,
json!(&meta.cas_id),
),
db.file_path().update(
file_path::pub_id::equals(uuid_to_bytes(*pub_id)),
file_path::pub_id::equals(sd_utils::uuid_to_bytes(*pub_id)),
vec![file_path::cas_id::set(Some(meta.cas_id.clone()))],
),
)
@ -230,8 +229,8 @@ async fn identifier_job_step(
.map(|(file_path_pub_id, (meta, fp))| {
let object_pub_id = Uuid::new_v4();
let sync_id = || sync::object::SyncId {
pub_id: uuid_to_bytes(object_pub_id),
let sync_id = || prisma_sync::object::SyncId {
pub_id: sd_utils::uuid_to_bytes(object_pub_id),
};
let kind = meta.kind as i32;
@ -251,7 +250,7 @@ async fn identifier_job_step(
let object_creation_args = (
sync.shared_create(sync_id(), sync_params),
object::create_unchecked(uuid_to_bytes(object_pub_id), db_params),
object::create_unchecked(sd_utils::uuid_to_bytes(object_pub_id), db_params),
);
(object_creation_args, {
@ -319,16 +318,16 @@ fn file_path_object_connect_ops<'db>(
(
sync.shared_update(
sync::file_path::SyncId {
pub_id: uuid_to_bytes(file_path_id),
prisma_sync::file_path::SyncId {
pub_id: sd_utils::uuid_to_bytes(file_path_id),
},
file_path::object::NAME,
json!(sync::object::SyncId {
json!(prisma_sync::object::SyncId {
pub_id: vec_id.clone()
}),
),
db.file_path().update(
file_path::pub_id::equals(uuid_to_bytes(file_path_id)),
file_path::pub_id::equals(sd_utils::uuid_to_bytes(file_path_id)),
vec![file_path::object::connect(object::pub_id::equals(vec_id))],
),
)

View file

@ -7,7 +7,7 @@ use crate::{
file_path_for_file_identifier, IsolatedFilePathData,
},
prisma::{file_path, location, PrismaClient, SortOrder},
util::db::{chain_optional_iter, maybe_missing},
util::db::maybe_missing,
};
use std::path::{Path, PathBuf};
@ -120,7 +120,7 @@ fn orphan_path_filters(
file_path_id: Option<file_path::id::Type>,
sub_iso_file_path: &IsolatedFilePathData<'_>,
) -> Vec<file_path::WhereParam> {
chain_optional_iter(
sd_utils::chain_optional_iter(
[
file_path::object_id::equals(None),
file_path::is_dir::equals(Some(false)),

View file

@ -1,6 +1,7 @@
pub mod seed;
use chrono::{DateTime, FixedOffset, Utc};
use sd_prisma::prisma_sync;
use sd_sync::*;
use serde::Deserialize;
use serde_json::json;
@ -8,7 +9,7 @@ use specta::Type;
use uuid::Uuid;
use crate::{library::Library, prisma::tag, sync};
use crate::{library::Library, prisma::tag};
#[derive(Type, Deserialize, Clone)]
pub struct TagCreateArgs {
@ -28,7 +29,7 @@ impl TagCreateArgs {
db,
(
sync.shared_create(
sync::tag::SyncId {
prisma_sync::tag::SyncId {
pub_id: pub_id.clone(),
},
[

View file

@ -8,11 +8,7 @@ use crate::{
file_path_for_object_validator, IsolatedFilePathData,
},
prisma::{file_path, location},
sync::{self, OperationFactory},
util::{
db::{chain_optional_iter, maybe_missing},
error::FileIOError,
},
util::{db::maybe_missing, error::FileIOError},
};
use std::{
@ -20,6 +16,8 @@ use std::{
path::{Path, PathBuf},
};
use sd_prisma::prisma_sync;
use sd_sync::OperationFactory;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::info;
@ -101,7 +99,7 @@ impl StatefulJob for ObjectValidatorJobInit {
let steps = db
.file_path()
.find_many(chain_optional_iter(
.find_many(sd_utils::chain_optional_iter(
[
file_path::location_id::equals(Some(init.location.id)),
file_path::is_dir::equals(Some(false)),
@ -153,7 +151,7 @@ impl StatefulJob for ObjectValidatorJobInit {
sync.write_op(
db,
sync.shared_update(
sync::file_path::SyncId {
prisma_sync::file_path::SyncId {
pub_id: file_path.pub_id.clone(),
},
file_path::integrity_checksum::NAME,

View file

@ -10,7 +10,7 @@ use futures::Stream;
use sd_p2p::{
spaceblock::{BlockSize, SpaceblockRequest, Transfer},
spacetunnel::{Identity, Tunnel},
Event, Manager, ManagerError, MetadataManager, PeerId,
Event, Manager, ManagerError, ManagerStream, MetadataManager, PeerId,
};
use sd_sync::CRDTOperation;
use serde::Serialize;
@ -25,10 +25,9 @@ use tracing::{debug, error, info, warn};
use uuid::Uuid;
use crate::{
library::{LibraryManager, SubscriberEvent},
library::LibraryManager,
node::{NodeConfig, NodeConfigManager},
p2p::{OperatingSystem, SPACEDRIVE_APP_ID},
sync::SyncMessage,
};
use super::{Header, PairingManager, PairingStatus, PeerMetadata};
@ -69,25 +68,23 @@ pub struct P2PManager {
pub metadata_manager: Arc<MetadataManager<PeerMetadata>>,
pub spacedrop_progress: Arc<Mutex<HashMap<Uuid, broadcast::Sender<u8>>>>,
pub pairing: Arc<PairingManager>,
library_manager: Arc<LibraryManager>,
}
impl P2PManager {
pub async fn new(
node_config: Arc<NodeConfigManager>,
library_manager: Arc<LibraryManager>,
) -> Result<Arc<Self>, ManagerError> {
) -> Result<(Arc<P2PManager>, ManagerStream<PeerMetadata>), ManagerError> {
let (config, keypair) = {
let config = node_config.get().await;
(
Self::config_to_metadata(&config, &library_manager).await,
Self::config_to_metadata(&config /* , &library_manager */).await,
config.keypair,
)
};
let metadata_manager = MetadataManager::new(config);
let (manager, mut stream) =
let (manager, stream) =
Manager::new(SPACEDRIVE_APP_ID, &keypair, metadata_manager.clone()).await?;
info!(
@ -102,13 +99,55 @@ impl P2PManager {
let spacedrop_pairing_reqs = Arc::new(Mutex::new(HashMap::new()));
let spacedrop_progress = Arc::new(Mutex::new(HashMap::new()));
let pairing = PairingManager::new(manager.clone(), tx.clone(), library_manager.clone());
let pairing = PairingManager::new(manager.clone(), tx.clone());
// TODO: proper shutdown
// https://docs.rs/ctrlc/latest/ctrlc/
// https://docs.rs/system_shutdown/latest/system_shutdown/
let this = Arc::new(Self {
pairing,
events: (tx, rx),
manager,
spacedrop_pairing_reqs,
metadata_manager,
spacedrop_progress,
});
// library_manager
// .subscribe({
// let this = this.clone();
// move |event| match event {
// SubscriberEvent::Load(library_id, library_identity, mut sync_rx) => {
// let this = this.clone();
// }
// }
// })
// .await;
// TODO: Probs remove this once connection timeout/keepalive are working correctly
tokio::spawn({
let events = tx.clone();
let spacedrop_pairing_reqs = spacedrop_pairing_reqs.clone();
let spacedrop_progress = spacedrop_progress.clone();
let library_manager = library_manager.clone();
let pairing = pairing.clone();
let this = this.clone();
async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
this.ping().await;
}
}
});
Ok((this, stream))
}
pub fn start(
&self,
mut stream: ManagerStream<PeerMetadata>,
library_manager: Arc<LibraryManager>,
) {
tokio::spawn({
let events = self.events.0.clone();
let spacedrop_pairing_reqs = self.spacedrop_pairing_reqs.clone();
let spacedrop_progress = self.spacedrop_progress.clone();
let pairing = self.pairing.clone();
async move {
let mut shutdown = false;
@ -204,7 +243,9 @@ impl P2PManager {
};
}
Header::Pair => {
pairing.responder(event.peer_id, stream).await;
pairing
.responder(event.peer_id, stream, library_manager)
.await;
}
Header::Sync(library_id) => {
let mut stream = Tunnel::from_stream(stream).await.unwrap();
@ -228,14 +269,12 @@ impl P2PManager {
};
for op in operations {
library.sync.ingest_op(op).await.unwrap_or_else(
|err| {
error!(
"error ingesting operation for library '{}': {err:?}",
library.id
);
},
);
library.sync.apply_op(op).await.unwrap_or_else(|err| {
error!(
"error ingesting operation for library '{}': {err:?}",
library.id
);
});
}
}
}
@ -259,57 +298,11 @@ impl P2PManager {
}
}
});
// TODO: proper shutdown
// https://docs.rs/ctrlc/latest/ctrlc/
// https://docs.rs/system_shutdown/latest/system_shutdown/
let this = Arc::new(Self {
pairing,
events: (tx, rx),
manager,
spacedrop_pairing_reqs,
metadata_manager,
spacedrop_progress,
library_manager: library_manager.clone(),
});
library_manager
.subscribe({
let this = this.clone();
move |event| match event {
SubscriberEvent::Load(library_id, library_identity, mut sync_rx) => {
let this = this.clone();
tokio::spawn(async move {
while let Ok(op) = sync_rx.recv().await {
let SyncMessage::Created(op) = op else { continue; };
this.broadcast_sync_events(library_id, &library_identity, vec![op])
.await;
}
});
}
}
})
.await;
// TODO: Probs remove this once connection timeout/keepalive are working correctly
tokio::spawn({
let this = this.clone();
async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
this.ping().await;
}
}
});
Ok(this)
}
async fn config_to_metadata(
config: &NodeConfig,
library_manager: &LibraryManager,
// library_manager: &LibraryManager,
) -> PeerMetadata {
PeerMetadata {
name: config.name.clone(),
@ -317,24 +310,27 @@ impl P2PManager {
version: Some(env!("CARGO_PKG_VERSION").to_string()),
email: config.p2p_email.clone(),
img_url: config.p2p_img_url.clone(),
instances: library_manager
.get_all_instances()
.await
.into_iter()
.filter_map(|i| {
Identity::from_bytes(&i.identity)
.map(|i| hex::encode(i.public_key().to_bytes()))
.ok()
})
.collect(),
// instances: library_manager
// .get_all_instances()
// .await
// .into_iter()
// .filter_map(|i| {
// Identity::from_bytes(&i.identity)
// .map(|i| hex::encode(i.public_key().to_bytes()))
// .ok()
// })
// .collect(),
}
}
#[allow(unused)] // TODO: Should probs be using this
pub async fn update_metadata(&self, node_config_manager: &NodeConfigManager) {
self.metadata_manager.update(
Self::config_to_metadata(&node_config_manager.get().await, &self.library_manager).await,
);
pub async fn update_metadata(
&self,
node_config_manager: &NodeConfigManager,
library_manager: &LibraryManager,
) {
self.metadata_manager
.update(Self::config_to_metadata(&node_config_manager.get().await).await);
}
pub async fn accept_spacedrop(&self, id: Uuid, path: String) {
@ -358,7 +354,10 @@ impl P2PManager {
library_id: Uuid,
_identity: &Identity,
event: Vec<CRDTOperation>,
library_manager: &LibraryManager,
) {
println!("broadcasting sync events!");
let mut buf = match rmp_serde::to_vec_named(&event) {
Ok(buf) => buf,
Err(e) => {
@ -374,7 +373,7 @@ impl P2PManager {
// TODO: Establish a connection to them
let _library = self.library_manager.get_library(library_id).await.unwrap();
let _library = library_manager.get_library(library_id).await.unwrap();
todo!();

View file

@ -38,21 +38,21 @@ pub struct PairingManager {
events_tx: broadcast::Sender<P2PEvent>,
pairing_response: RwLock<HashMap<u16, oneshot::Sender<PairingDecision>>>,
manager: Arc<Manager<PeerMetadata>>,
library_manager: Arc<LibraryManager>,
// library_manager: Arc<LibraryManager>,
}
impl PairingManager {
pub fn new(
manager: Arc<Manager<PeerMetadata>>,
events_tx: broadcast::Sender<P2PEvent>,
library_manager: Arc<LibraryManager>,
// library_manager: Arc<LibraryManager>,
) -> Arc<Self> {
Arc::new(Self {
id: AtomicU16::new(0),
events_tx,
pairing_response: RwLock::new(HashMap::new()),
manager,
library_manager,
// library_manager,
})
}
@ -70,7 +70,12 @@ impl PairingManager {
// TODO: Error handling
pub async fn originator(self: Arc<Self>, peer_id: PeerId, node_config: NodeConfig) -> u16 {
pub async fn originator(
self: Arc<Self>,
peer_id: PeerId,
node_config: NodeConfig,
library_manager: Arc<LibraryManager>,
) -> u16 {
// TODO: Timeout for max number of pairings in a time period
let pairing_id = self.id.fetch_add(1, Ordering::SeqCst);
@ -119,8 +124,7 @@ impl PairingManager {
// TODO: Future - Library in pairing state
// TODO: Create library
if self
.library_manager
if library_manager
.get_all_libraries()
.await
.into_iter()
@ -134,8 +138,7 @@ impl PairingManager {
return;
}
let library_config = self
.library_manager
let library_config = library_manager
.create_with_uuid(
library_id,
LibraryName::new(library_name).unwrap(),
@ -145,8 +148,7 @@ impl PairingManager {
)
.await
.unwrap();
let library = self
.library_manager
let library = library_manager
.get_library(library_config.uuid)
.await
.unwrap();
@ -207,6 +209,7 @@ impl PairingManager {
self: Arc<Self>,
peer_id: PeerId,
mut stream: impl AsyncRead + AsyncWrite + Unpin,
library_manager: Arc<LibraryManager>,
) {
let pairing_id = self.id.fetch_add(1, Ordering::SeqCst);
self.emit_progress(pairing_id, PairingStatus::EstablishingConnection);
@ -239,7 +242,7 @@ impl PairingManager {
};
info!("The user accepted pairing '{pairing_id}' for library '{library_id}'!");
let library = self.library_manager.get_library(library_id).await.unwrap();
let library = library_manager.get_library(library_id).await.unwrap();
stream
.write_all(
&PairingResponse::Accepted {

View file

@ -40,6 +40,7 @@ impl From<Instance> for instance::CreateUnchecked {
node_platform: i.node_platform as i32,
last_seen: i.last_seen.into(),
date_created: i.date_created.into(),
// timestamp: Default::default(), // TODO: Source this properly!
_params: vec![],
}
}

View file

@ -1,6 +1,5 @@
use std::{collections::HashMap, env, str::FromStr};
use itertools::Itertools;
use sd_p2p::Metadata;
use serde::{Deserialize, Serialize};
use specta::Type;
@ -14,7 +13,7 @@ pub struct PeerMetadata {
pub(super) version: Option<String>,
pub(super) email: Option<String>,
pub(super) img_url: Option<String>,
pub(super) instances: Vec<String>,
// pub(super) instances: Vec<String>,
}
impl Metadata for PeerMetadata {
@ -33,7 +32,7 @@ impl Metadata for PeerMetadata {
if let Some(img_url) = self.img_url {
map.insert("img_url".to_owned(), img_url);
}
map.insert("instances".to_owned(), self.instances.into_iter().join(","));
// map.insert("instances".to_owned(), self.instances.into_iter().join(","));
map
}
@ -56,15 +55,15 @@ impl Metadata for PeerMetadata {
version: data.get("version").map(|v| v.to_owned()),
email: data.get("email").map(|v| v.to_owned()),
img_url: data.get("img_url").map(|v| v.to_owned()),
instances: data
.get("instances")
.ok_or_else(|| {
"DNS record for field 'instances' missing. Unable to decode 'PeerMetadata'!"
.to_owned()
})?
.split(',')
.map(|s| s.parse().map_err(|_| "Unable to parse instance 'Uuid'!"))
.collect::<Result<Vec<_>, _>>()?,
// instances: data
// .get("instances")
// .ok_or_else(|| {
// "DNS record for field 'instances' missing. Unable to decode 'PeerMetadata'!"
// .to_owned()
// })?
// .split(',')
// .map(|s| s.parse().map_err(|_| "Unable to parse instance 'Uuid'!"))
// .collect::<Result<Vec<_>, _>>()?,
})
}
}

View file

@ -107,7 +107,7 @@ impl PreferenceKVs {
self
}
pub fn to_upserts(self, db: &PrismaClient) -> Vec<preference::UpsertQuery> {
pub fn into_upserts(self, db: &PrismaClient) -> Vec<preference::UpsertQuery> {
self.0
.into_iter()
.map(|(key, value)| {

View file

@ -25,7 +25,7 @@ impl LibraryPreferences {
pub async fn write(self, db: &PrismaClient) -> prisma_client_rust::Result<()> {
let kvs = self.to_kvs();
db._batch(kvs.to_upserts(db)).await?;
db._batch(kvs.into_upserts(db)).await?;
Ok(())
}

View file

@ -1,220 +0,0 @@
#![allow(clippy::unwrap_used, clippy::panic)] // TODO: Brendan remove this once you've got error handling here
use crate::prisma::*;
use sd_sync::*;
use std::{collections::HashMap, sync::Arc};
use serde_json::to_vec;
use tokio::sync::broadcast::{self, Receiver, Sender};
use uhlc::{HLCBuilder, HLC, NTP64};
use uuid::Uuid;
use super::ModelSyncData;
#[derive(Clone)]
pub enum SyncMessage {
Ingested(CRDTOperation),
Created(CRDTOperation),
}
pub struct SyncManager {
db: Arc<PrismaClient>,
instance: Uuid,
_clocks: HashMap<Uuid, NTP64>,
clock: HLC,
pub tx: Sender<SyncMessage>,
}
impl SyncManager {
pub fn new(db: &Arc<PrismaClient>, instance: Uuid) -> (Self, Receiver<SyncMessage>) {
let (tx, rx) = broadcast::channel(64);
(
Self {
db: db.clone(),
instance,
clock: HLCBuilder::new().with_id(instance.into()).build(),
_clocks: Default::default(),
tx,
},
rx,
)
}
pub async fn write_ops<'item, I: prisma_client_rust::BatchItem<'item>>(
&self,
tx: &PrismaClient,
(_ops, queries): (Vec<CRDTOperation>, I),
) -> prisma_client_rust::Result<<I as prisma_client_rust::BatchItemParent>::ReturnValue> {
#[cfg(feature = "sync-messages")]
let res = {
let shared = _ops
.iter()
.filter_map(|op| match &op.typ {
CRDTOperationType::Shared(shared_op) => {
let kind = match &shared_op.data {
SharedOperationData::Create => "c",
SharedOperationData::Update { .. } => "u",
SharedOperationData::Delete => "d",
};
Some(tx.shared_operation().create(
op.id.as_bytes().to_vec(),
op.timestamp.0 as i64,
shared_op.model.to_string(),
to_vec(&shared_op.record_id).unwrap(),
kind.to_string(),
to_vec(&shared_op.data).unwrap(),
instance::pub_id::equals(op.instance.as_bytes().to_vec()),
vec![],
))
}
_ => None,
})
.collect::<Vec<_>>();
let (res, _) = tx._batch((queries, shared)).await?;
for op in _ops {
self.tx.send(SyncMessage::Created(op)).ok();
}
res
};
#[cfg(not(feature = "sync-messages"))]
let res = tx._batch([queries]).await?.remove(0);
Ok(res)
}
#[allow(unused_variables)]
pub async fn write_op<'item, Q: prisma_client_rust::BatchItem<'item>>(
&self,
tx: &PrismaClient,
op: CRDTOperation,
query: Q,
) -> prisma_client_rust::Result<<Q as prisma_client_rust::BatchItemParent>::ReturnValue> {
#[cfg(feature = "sync-messages")]
let ret = {
let ret = match &op.typ {
CRDTOperationType::Shared(shared_op) => {
let kind = match &shared_op.data {
SharedOperationData::Create => "c",
SharedOperationData::Update { .. } => "u",
SharedOperationData::Delete => "d",
};
tx._batch((
tx.shared_operation().create(
op.id.as_bytes().to_vec(),
op.timestamp.0 as i64,
shared_op.model.to_string(),
to_vec(&shared_op.record_id).unwrap(),
kind.to_string(),
to_vec(&shared_op.data).unwrap(),
instance::pub_id::equals(op.instance.as_bytes().to_vec()),
vec![],
),
query,
))
.await?
.1
}
_ => todo!(),
};
self.tx.send(SyncMessage::Created(op)).ok();
ret
};
#[cfg(not(feature = "sync-messages"))]
let ret = tx._batch(vec![query]).await?.remove(0);
Ok(ret)
}
pub async fn get_ops(&self) -> prisma_client_rust::Result<Vec<CRDTOperation>> {
Ok(self
.db
.shared_operation()
.find_many(vec![])
.order_by(shared_operation::timestamp::order(SortOrder::Asc))
.include(shared_operation::include!({ instance: select {
pub_id
} }))
.exec()
.await?
.into_iter()
.flat_map(|op| {
Some(CRDTOperation {
id: Uuid::from_slice(&op.id).ok()?,
instance: Uuid::from_slice(&op.instance.pub_id).ok()?,
timestamp: NTP64(op.timestamp as u64),
typ: CRDTOperationType::Shared(SharedOperation {
record_id: serde_json::from_slice(&op.record_id).ok()?,
model: op.model,
data: serde_json::from_slice(&op.data).ok()?,
}),
})
})
.collect())
}
pub async fn ingest_op(&self, op: CRDTOperation) -> prisma_client_rust::Result<()> {
let db = &self.db;
if db
.instance()
.find_unique(instance::pub_id::equals(op.instance.as_bytes().to_vec()))
.exec()
.await?
.is_none()
{
panic!("Node is not paired!")
}
let msg = SyncMessage::Ingested(op.clone());
ModelSyncData::from_op(op.typ.clone())
.unwrap()
.exec(db)
.await?;
if let CRDTOperationType::Shared(shared_op) = op.typ {
let kind = match &shared_op.data {
SharedOperationData::Create => "c",
SharedOperationData::Update { .. } => "u",
SharedOperationData::Delete => "d",
};
db.shared_operation()
.create(
op.id.as_bytes().to_vec(),
op.timestamp.0 as i64,
shared_op.model.to_string(),
to_vec(&shared_op.record_id).unwrap(),
kind.to_string(),
to_vec(&shared_op.data).unwrap(),
instance::pub_id::equals(op.instance.as_bytes().to_vec()),
vec![],
)
.exec()
.await?;
}
self.tx.send(msg).ok();
Ok(())
}
}
impl OperationFactory for SyncManager {
fn get_clock(&self) -> &HLC {
&self.clock
}
fn get_instance(&self) -> Uuid {
self.instance
}
}

View file

@ -1,5 +0,0 @@
mod manager;
pub use crate::prisma_sync::*;
pub use manager::*;
pub use sd_sync::*;

View file

@ -1,7 +1,6 @@
use crate::prisma::{self, PrismaClient};
use prisma_client_rust::{migrations::*, NewClientError};
use thiserror::Error;
use uuid::Uuid;
/// MigrationError represents an error that occurring while opening a initialising and running migrations on the database.
#[derive(Error, Debug)]
@ -58,28 +57,6 @@ pub async fn load_and_migrate(db_url: &str) -> Result<PrismaClient, MigrationErr
Ok(client)
}
/// Combines an iterator of `T` and an iterator of `Option<T>`,
/// removing any `None` values in the process
pub fn chain_optional_iter<T>(
required: impl IntoIterator<Item = T>,
optional: impl IntoIterator<Item = Option<T>>,
) -> Vec<T> {
required
.into_iter()
.map(Some)
.chain(optional)
.flatten()
.collect()
}
pub fn uuid_to_bytes(uuid: Uuid) -> Vec<u8> {
uuid.as_bytes().to_vec()
}
pub fn from_bytes_to_uuid(bytes: &[u8]) -> Uuid {
Uuid::from_slice(bytes).expect("corrupted uuid in database")
}
pub fn inode_from_db(db_inode: &[u8]) -> u64 {
u64::from_le_bytes(db_inode.try_into().expect("corrupted inode in database"))
}

View file

@ -3,6 +3,7 @@
use std::{
io,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
@ -96,7 +97,7 @@ impl InitConfig {
pub async fn apply(
self,
library_manager: &LibraryManager,
library_manager: &Arc<LibraryManager>,
node_cfg: NodeConfig,
) -> Result<(), InitConfigError> {
info!("Initializing app from file: {:?}", self.path);

View file

@ -43,7 +43,7 @@ impl<'a> ModelSyncType<'a> {
.field("id")
.and_then(|field| match field {
AttributeFieldValue::Single(s) => Some(s),
AttributeFieldValue::List(l) => None,
AttributeFieldValue::List(_) => None,
})
.and_then(|name| model.fields().find(|f| f.name() == *name))?;
@ -58,20 +58,20 @@ impl<'a> ModelSyncType<'a> {
attr.field(name)
.and_then(|field| match field {
AttributeFieldValue::Single(s) => Some(*s),
AttributeFieldValue::List(l) => None,
AttributeFieldValue::List(_) => None,
})
.and_then(|name| {
match model
.fields()
.find(|f| f.name() == name)
.expect(&format!("'{name}' field not found"))
.unwrap_or_else(|| panic!("'{name}' field not found"))
.refine()
{
RefinedFieldWalker::Relation(r) => Some(r),
_ => None,
}
})
.expect(&format!("'{name}' must be a relation field"))
.unwrap_or_else(|| panic!("'{name}' must be a relation field"))
};
Self::Relation {
@ -88,10 +88,9 @@ impl<'a> ModelSyncType<'a> {
fn sync_id(&self) -> Vec<FieldWalker> {
match self {
// Self::Owned { id } => id.clone(),
Self::Local { id } => vec![id.clone()],
Self::Shared { id } => vec![id.clone()],
Self::Local { id } => vec![*id],
Self::Shared { id } => vec![*id],
Self::Relation { group, item } => vec![(*group).into(), (*item).into()],
_ => vec![],
}
}
}

View file

@ -6,6 +6,36 @@ use specta::Type;
use uhlc::NTP64;
use uuid::Uuid;
pub enum OperationKind<'a> {
Create,
Update(&'a str),
Delete,
}
impl std::fmt::Display for OperationKind<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OperationKind::Create => write!(f, "c"),
OperationKind::Update(field) => write!(f, "u:{}", field),
OperationKind::Delete => write!(f, "d"),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
pub struct RelationOperation {
pub relation_item: Value,
pub relation_group: Value,
pub relation: String,
pub data: RelationOperationData,
}
impl RelationOperation {
pub fn kind(&self) -> OperationKind {
self.data.as_kind()
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
pub enum RelationOperationData {
#[serde(rename = "c")]
@ -16,12 +46,27 @@ pub enum RelationOperationData {
Delete,
}
impl RelationOperationData {
fn as_kind(&self) -> OperationKind {
match self {
Self::Create => OperationKind::Create,
Self::Update { field, .. } => OperationKind::Update(field),
Self::Delete => OperationKind::Delete,
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
pub struct RelationOperation {
pub relation_item: Value,
pub relation_group: Value,
pub relation: String,
pub data: RelationOperationData,
pub struct SharedOperation {
pub record_id: Value,
pub model: String,
pub data: SharedOperationData,
}
impl SharedOperation {
pub fn kind(&self) -> OperationKind {
self.data.as_kind()
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
@ -34,11 +79,14 @@ pub enum SharedOperationData {
Delete,
}
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
pub struct SharedOperation {
pub record_id: Value,
pub model: String,
pub data: SharedOperationData,
impl SharedOperationData {
fn as_kind(&self) -> OperationKind {
match self {
Self::Create => OperationKind::Create,
Self::Update { field, .. } => OperationKind::Update(field),
Self::Delete => OperationKind::Delete,
}
}
}
// #[derive(Serialize, Deserialize, Clone, Debug, Type)]

View file

@ -5,3 +5,160 @@ mod model_traits;
pub use crdt::*;
pub use factory::*;
pub use model_traits::*;
// fn compare_messages(&self, operations: Vec<CRDTOperation>) -> Vec<(CRDTOperation, bool)> {
// operations
// .into_iter()
// .map(|op| (op.id, op))
// .collect::<HashMap<_, _>>()
// .into_iter()
// .filter_map(|(_, op)| {
// match &op.typ {
// CRDTOperationType::Owned(_) => {
// self._operations.iter().find(|find_op| match &find_op.typ {
// CRDTOperationType::Owned(_) => {
// find_op.timestamp >= op.timestamp && find_op.node == op.node
// }
// _ => false,
// })
// }
// CRDTOperationType::Shared(shared_op) => {
// self._operations.iter().find(|find_op| match &find_op.typ {
// CRDTOperationType::Shared(find_shared_op) => {
// shared_op.model == find_shared_op.model
// && shared_op.record_id == find_shared_op.record_id
// && find_op.timestamp >= op.timestamp
// }
// _ => false,
// })
// }
// CRDTOperationType::Relation(relation_op) => {
// self._operations.iter().find(|find_op| match &find_op.typ {
// CRDTOperationType::Relation(find_relation_op) => {
// relation_op.relation == find_relation_op.relation
// && relation_op.relation_item == find_relation_op.relation_item
// && relation_op.relation_group == find_relation_op.relation_group
// }
// _ => false,
// })
// }
// }
// .map(|old_op| (old_op.timestamp != op.timestamp).then_some(true))
// .unwrap_or(Some(false))
// .map(|old| (op, old))
// })
// .collect()
// }
// pub fn receive_crdt_operations(&mut self, ops: Vec<CRDTOperation>) {
// for op in &ops {
// self._clock
// .update_with_timestamp(&Timestamp::new(op.timestamp, op.node.into()))
// .ok();
// self._clocks.insert(op.node, op.timestamp);
// }
// for (op, old) in self.compare_messages(ops) {
// let push_op = op.clone();
// if !old {
// match op.typ {
// CRDTOperationType::Shared(shared_op) => match shared_op.model.as_str() {
// "Object" => {
// let id = shared_op.record_id;
// match shared_op.data {
// SharedOperationData::Create(SharedOperationCreateData::Atomic) => {
// self.objects.insert(
// id,
// Object {
// id,
// ..Default::default()
// },
// );
// }
// SharedOperationData::Update { field, value } => {
// let mut file = self.objects.get_mut(&id).unwrap();
// match field.as_str() {
// "name" => {
// file.name = from_value(value).unwrap();
// }
// _ => unreachable!(),
// }
// }
// SharedOperationData::Delete => {
// self.objects.remove(&id).unwrap();
// }
// _ => {}
// }
// }
// _ => unreachable!(),
// },
// CRDTOperationType::Owned(owned_op) => match owned_op.model.as_str() {
// "FilePath" => {
// for item in owned_op.items {
// let id = from_value(item.id).unwrap();
// match item.data {
// OwnedOperationData::Create(data) => {
// self.file_paths.insert(
// id,
// from_value(Value::Object(data.into_iter().collect()))
// .unwrap(),
// );
// }
// OwnedOperationData::Update(data) => {
// let obj = self.file_paths.get_mut(&id).unwrap();
// for (key, value) in data {
// match key.as_str() {
// "path" => obj.path = from_value(value).unwrap(),
// "file" => obj.file = from_value(value).unwrap(),
// _ => unreachable!(),
// }
// }
// }
// OwnedOperationData::Delete => {
// self.file_paths.remove(&id);
// }
// }
// }
// }
// _ => unreachable!(),
// },
// CRDTOperationType::Relation(relation_op) => match relation_op.relation.as_str()
// {
// "TagOnObject" => match relation_op.data {
// RelationOperationData::Create => {
// self.tags_on_objects.insert(
// (relation_op.relation_item, relation_op.relation_group),
// TagOnObject {
// object_id: relation_op.relation_item,
// tag_id: relation_op.relation_group,
// },
// );
// }
// RelationOperationData::Update { field: _, value: _ } => {
// // match field.as_str() {
// // _ => unreachable!(),
// // }
// }
// RelationOperationData::Delete => {
// self.tags_on_objects
// .remove(&(
// relation_op.relation_item,
// relation_op.relation_group,
// ))
// .unwrap();
// }
// },
// _ => unreachable!(),
// },
// }
// self._operations.push(push_op)
// }
// }
// }

9
crates/utils/Cargo.toml Normal file
View file

@ -0,0 +1,9 @@
[package]
name = "sd-utils"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
uuid = { workspace = true }

23
crates/utils/src/lib.rs Normal file
View file

@ -0,0 +1,23 @@
use uuid::Uuid;
/// Combines an iterator of `T` and an iterator of `Option<T>`,
/// removing any `None` values in the process
pub fn chain_optional_iter<T>(
required: impl IntoIterator<Item = T>,
optional: impl IntoIterator<Item = Option<T>>,
) -> Vec<T> {
required
.into_iter()
.map(Some)
.chain(optional)
.flatten()
.collect()
}
pub fn uuid_to_bytes(uuid: Uuid) -> Vec<u8> {
uuid.as_bytes().to_vec()
}
pub fn from_bytes_to_uuid(bytes: &[u8]) -> Uuid {
Uuid::from_slice(bytes).expect("corrupted uuid in database")
}

View file

@ -8,7 +8,7 @@ import {
FilmStrip,
Planet
} from 'phosphor-react';
import { LibraryContextProvider, useClientContext } from '@sd/client';
import { LibraryContextProvider, useClientContext, useFeatureFlag } from '@sd/client';
import { SubtleButton } from '~/components/SubtleButton';
import Icon from './Icon';
import { LibrarySection } from './LibrarySection';
@ -33,6 +33,12 @@ export default () => {
<Icon component={ArchiveBox} />
Imports
</SidebarLink> */}
{useFeatureFlag('syncRoute') && (
<SidebarLink to="sync">
<Icon component={ArrowsClockwise} />
Sync
</SidebarLink>
)}
</div>
{library && (
<LibraryContextProvider library={library}>

View file

@ -1,39 +1,27 @@
import { CRDTOperation, useLibraryQuery, useLibrarySubscription } from '@sd/client';
import { tw } from '@sd/ui';
import { useMemo } from 'react';
import { stringify } from 'uuid';
import {
CRDTOperation,
RelationOperation,
SharedOperation,
useLibraryQuery,
useLibrarySubscription
} from '@sd/client';
const Label = tw.span`text-gray-300`;
const Pill = tw.span`rounded-full bg-gray-500 px-2 py-1`;
const Row = tw.p`overflow-hidden text-ellipsis space-x-1`;
const OperationItem = ({ op }: { op: CRDTOperation }) => {
let contents = null;
if ('model' in op.typ) {
let subContents = null;
if (op.typ.data === 'd') subContents = 'Delete';
else if (op.typ.data === 'c') subContents = 'Create';
else subContents = `Update - ${op.typ.data.u.field}`;
contents = (
<>
<div className="space-x-2">
<Pill>{subContents}</Pill>
</div>
<Row>
<Label>Model</Label>
<span>{op.typ.model}</span>
</Row>
<Row>
<Label>Time</Label>
<span>{op.timestamp}</span>
</Row>
</>
);
}
return <li className="space-y-1 rounded-md bg-gray-700 p-2 text-sm">{contents}</li>;
};
type MessageGroup =
| {
variant: 'Shared';
model: string;
id: string;
messages: { op: SharedOperation; timestamp: number }[];
}
| {
variant: 'Relation';
relation: string;
item_id: string;
group_id: string;
messages: { op: RelationOperation; timestamp: number }[];
};
export const Component = () => {
const messages = useLibraryQuery(['sync.messages']);
@ -42,11 +30,156 @@ export const Component = () => {
onData: () => messages.refetch()
});
const groups = useMemo(() => {
if (messages.data) return calculateGroups(messages.data);
}, [messages]);
return (
<ul className="space-y-2">
{messages.data?.map((op) => (
<OperationItem key={op.id} op={op} />
<ul className="space-y-4 p-4">
{groups?.map((group, index) => (
<OperationGroup key={index} group={group} />
))}
</ul>
);
};
const OperationGroup: React.FC<{ group: MessageGroup }> = ({ group }) => {
const [header, contents] = (() => {
switch (group.variant) {
case 'Shared': {
const header = (
<div className="flex items-center space-x-2 p-2">
<span>{group.model}</span>
<span className="">{group.id}</span>
</div>
);
const contents = (
<ul className="flex flex-col space-y-2 p-2">
{group.messages.map((message, index) => (
<li key={index} className="flex flex-row justify-between px-2">
{typeof message.op.data === 'string' ? (
<p>{message.op.data === 'c' ? 'Create' : 'Delete'}</p>
) : (
<p>Update - {message.op.data.u.field}</p>
)}
<p className="text-gray-400">{message.timestamp}</p>
</li>
))}
</ul>
);
return [header, contents];
}
case 'Relation': {
const header = (
<div className="flex items-center space-x-2 p-2">
<span>{group.relation}</span>
<span className="">{group.item_id}</span>
<span className="">in</span>
<span className="">{group.group_id}</span>
</div>
);
const contents = (
<ul className="flex flex-col space-y-2 p-2">
{group.messages.map((message, index) => (
<li key={index} className="flex flex-row justify-between px-2">
{typeof message.op.data === 'string' ? (
<p>{message.op.data === 'c' ? 'Create' : 'Delete'}</p>
) : (
<p>Update - {message.op.data.u.field}</p>
)}
<p className="text-gray-400">{message.timestamp}</p>
</li>
))}
</ul>
);
return [header, contents];
}
}
})();
return (
<div className="divide-y divide-gray bg-app-darkBox">
{header}
{contents}
</div>
);
};
function calculateGroups(messages: CRDTOperation[]) {
return messages.reduce<MessageGroup[]>((acc, curr) => {
const { typ } = curr;
if ('model' in typ) {
const id = stringify(typ.record_id.pub_id);
const latest = (() => {
const latest = acc[acc.length - 1];
if (
!latest ||
latest.variant !== 'Shared' ||
latest.model !== typ.model ||
latest.id !== id
) {
const group: MessageGroup = {
variant: 'Shared',
model: typ.model,
id,
messages: []
};
acc.push(group);
return group;
} else {
return latest;
}
})();
latest.messages.push({
op: typ,
timestamp: curr.timestamp
});
} else {
const id = {
item: stringify(typ.relation_item.pub_id),
group: stringify(typ.relation_group.pub_id)
};
const latest = (() => {
const latest = acc[acc.length - 1];
if (
!latest ||
latest.variant !== 'Relation' ||
latest.relation !== typ.relation ||
latest.item_id !== id.item ||
latest.group_id !== id.group
) {
const group: MessageGroup = {
variant: 'Relation',
relation: typ.relation,
item_id: id.item,
group_id: id.group,
messages: []
};
acc.push(group);
return group;
} else {
return latest;
}
})();
latest.messages.push({
op: typ,
timestamp: curr.timestamp
});
}
return acc;
}, []);
}

View file

@ -51,11 +51,10 @@ export default function OnboardingCreatingLibrary() {
// it feels more fitting to configure it here (once)
telemetryStore.shareTelemetry = obStore.shareTelemetry;
console.log('creating');
createLibrary.mutate({
name: obStore.newLibraryName
});
return;
};
const created = useRef(false);

View file

@ -261,7 +261,7 @@ export type PairingStatus = { type: "EstablishingConnection" } | { type: "Pairin
export type PeerId = string
export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; version: string | null; email: string | null; img_url: string | null; instances: string[] }
export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; version: string | null; email: string | null; img_url: string | null }
export type RelationOperation = { relation_item: any; relation_group: any; relation: string; data: RelationOperationData }

View file

@ -2,9 +2,9 @@ import { useEffect } from 'react';
import { subscribe, useSnapshot } from 'valtio';
import { valtioPersist } from '../lib/valito';
export const features = ['spacedrop', 'p2pPairing'] as const;
export const features = ['spacedrop', 'p2pPairing', 'syncRoute'] as const;
export type FeatureFlag = (typeof features)[number];
export type FeatureFlag = typeof features[number];
const featureFlagState = valtioPersist('sd-featureFlags', {
enabled: [] as FeatureFlag[]

View file

@ -1,4 +1,4 @@
import { PropsWithChildren, createContext, useContext, useState } from 'react';
import { PropsWithChildren, createContext, useState } from 'react';
import { Notification } from '../core';
import { useBridgeSubscription } from '../rspc';