Sync ingesting (#1181)

* sync + ingest refactor

* fix Event enum description

* actually do sync over network

* re-enable heif

* remove comment
This commit is contained in:
Brendan Allan 2023-08-07 00:52:43 -07:00 committed by GitHub
parent 9e919f9909
commit 00e4aa9e8f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 1806 additions and 1850 deletions

1592
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -19,19 +19,19 @@ edition = "2021"
repository = "https://github.com/spacedriveapp/spacedrive"
[workspace.dependencies]
prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "047c1102284a61ee400788baa7529d54ae745790", features = [
prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust", branch = "spacedrive", features = [
"rspc",
"sqlite-create-many",
"migrations",
"sqlite",
], default-features = false }
prisma-client-rust-cli = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "047c1102284a61ee400788baa7529d54ae745790", features = [
prisma-client-rust-cli = { git = "https://github.com/Brendonovich/prisma-client-rust", branch = "spacedrive", features = [
"rspc",
"sqlite-create-many",
"migrations",
"sqlite",
], default-features = false }
prisma-client-rust-sdk = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "047c1102284a61ee400788baa7529d54ae745790", features = [
prisma-client-rust-sdk = { git = "https://github.com/Brendonovich/prisma-client-rust", branch = "spacedrive", features = [
"sqlite",
], default-features = false }

View file

@ -1,5 +1,5 @@
[package]
name = "cli"
name = "sd-cli"
version = "0.1.0"
license = { workspace = true }
repository = { workspace = true }

View file

@ -1,9 +1,9 @@
[package]
name = "spacedrive"
name = "sd-desktop"
version = "0.1.0"
description = "The universal file manager."
authors = ["Spacedrive Technology Inc."]
default-run = "spacedrive"
default-run = "sd-desktop"
license = { workspace = true }
repository = { workspace = true }
edition = { workspace = true }

View file

@ -42,7 +42,7 @@ export function lockAppTheme(themeType: AppThemeType) {
return invoke()<null>("lock_app_theme", { themeType })
}
export type OpenWithApplication = { url: string; name: string }
export type OpenFilePathResult = { t: "NoLibrary" } | { t: "NoFile"; c: number } | { t: "OpenError"; c: [number, string] } | { t: "AllGood"; c: number } | { t: "Internal"; c: string }
export type AppThemeType = "Auto" | "Light" | "Dark"
export type RevealItem = { Location: { id: number } } | { FilePath: { id: number } }
export type AppThemeType = "Auto" | "Light" | "Dark"
export type OpenFilePathResult = { t: "NoLibrary" } | { t: "NoFile"; c: number } | { t: "OpenError"; c: [number, string] } | { t: "AllGood"; c: number } | { t: "Internal"; c: string }
export type OpenWithApplication = { url: string; name: string }

View file

@ -1,5 +1,5 @@
[package]
name = "server"
name = "sd-server"
version = "0.1.0"
license = { workspace = true }
repository = { workspace = true }

View file

@ -1,9 +1,9 @@
{
"name": "@sd/server",
"version": "0.0.0",
"main": "index.js",
"license": "GPL-3.0-only",
"scripts": {
"dev": "RUST_LOG=\"sd_core=info\" cargo watch -x 'run -p server'"
}
"name": "@sd/server",
"version": "0.0.0",
"main": "index.js",
"license": "GPL-3.0-only",
"scripts": {
"dev": "RUST_LOG=\"sd_core=info\" cargo watch -x 'run -p sd-server'"
}
}

View file

@ -4,7 +4,7 @@ version = "0.0.0"
edition = "2021"
[features]
default = ["emit-messages"]
default = []
emit-messages = []
[dependencies]
@ -19,3 +19,4 @@ serde_json = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true }
uhlc = "0.5.2"
slotmap = "1.0.6"

View file

@ -0,0 +1,61 @@
use tokio::sync::mpsc;
pub trait ActorTypes {
type Event;
type Request;
type Handler;
}
pub struct ActorIO<T: ActorTypes> {
pub event_rx: mpsc::Receiver<T::Event>,
pub req_tx: mpsc::Sender<T::Request>,
}
impl<T: ActorTypes> ActorIO<T> {
pub async fn send(&self, value: T::Request) -> Result<(), mpsc::error::SendError<T::Request>> {
self.req_tx.send(value).await
}
}
pub struct HandlerIO<T: ActorTypes> {
handler: T::Handler,
req_rx: mpsc::Receiver<T::Request>,
}
pub type SplitHandlerIO<T> = (
<T as ActorTypes>::Handler,
mpsc::Receiver<<T as ActorTypes>::Request>,
);
impl<T: ActorTypes> HandlerIO<T> {
pub fn split(self) -> SplitHandlerIO<T> {
(self.handler, self.req_rx)
}
}
pub fn create_actor_io<T: ActorTypes>(
make_handler: fn(mpsc::Sender<T::Event>) -> T::Handler,
) -> (ActorIO<T>, HandlerIO<T>) {
let (req_tx, req_rx) = mpsc::channel(20);
let (event_tx, event_rx) = mpsc::channel(20);
(
ActorIO { event_rx, req_tx },
HandlerIO {
handler: make_handler(event_tx),
req_rx,
},
)
}
#[macro_export]
macro_rules! wait {
($rx:expr, $pattern:pat $(=> $expr:expr)?) => {
loop {
match $rx.recv().await {
Some($pattern) => break $($expr)?,
_ => continue
}
}
};
}

View file

@ -0,0 +1,62 @@
use sd_prisma::prisma::*;
use sd_sync::*;
use uhlc::NTP64;
use uuid::Uuid;
shared_operation::include!(shared_include {
instance: select { pub_id }
});
relation_operation::include!(relation_include {
instance: select { pub_id }
});
pub enum DbOperation {
Shared(shared_include::Data),
Relation(relation_include::Data),
}
impl DbOperation {
pub fn timestamp(&self) -> NTP64 {
NTP64(match self {
Self::Shared(op) => op.timestamp,
Self::Relation(op) => op.timestamp,
} as u64)
}
pub fn id(&self) -> Uuid {
Uuid::from_slice(match self {
Self::Shared(op) => &op.id,
Self::Relation(op) => &op.id,
})
.unwrap()
}
pub fn instance(&self) -> Uuid {
Uuid::from_slice(match self {
Self::Shared(op) => &op.instance.pub_id,
Self::Relation(op) => &op.instance.pub_id,
})
.unwrap()
}
pub fn into_operation(self) -> CRDTOperation {
CRDTOperation {
id: self.id(),
instance: self.instance(),
timestamp: self.timestamp(),
typ: match self {
Self::Shared(op) => CRDTOperationType::Shared(SharedOperation {
record_id: serde_json::from_slice(&op.record_id).unwrap(),
model: op.model,
data: serde_json::from_slice(&op.data).unwrap(),
}),
Self::Relation(op) => CRDTOperationType::Relation(RelationOperation {
relation: op.relation,
data: serde_json::from_slice(&op.data).unwrap(),
relation_item: serde_json::from_slice(&op.item_id).unwrap(),
relation_group: serde_json::from_slice(&op.group_id).unwrap(),
}),
},
}
}
}

View file

@ -1,34 +1,261 @@
use sd_p2p::{spacetunnel::Tunnel, PeerId};
use sd_sync::CRDTOperation;
use std::{ops::Deref, sync::Arc};
use sd_p2p::spacetunnel::Tunnel;
use sd_prisma::{prisma::*, prisma_sync::ModelSyncData};
use sd_sync::*;
use sd_utils::uuid_to_bytes;
use serde_json::to_vec;
use tokio::sync::mpsc;
use uhlc::NTP64;
use uhlc::{Timestamp, NTP64};
use uuid::Uuid;
use crate::Timestamps;
pub struct Actor {
pub events: mpsc::Sender<Event>,
}
use crate::{actor::*, wait, SharedState};
#[must_use]
/// Stuff that can be handled outside the actor
pub enum Request {
Messages {
tunnel: Tunnel,
timestamps: Vec<(Uuid, NTP64)>,
},
Ingest(Vec<CRDTOperation>),
Ingested,
}
/// Stuff that the actor consumes
#[derive(Debug)]
pub enum Event {
Notification(NotificationEvent),
Messages(MessagesEvent),
}
#[derive(Debug, Default)]
pub enum State {
#[default]
WaitingForNotification,
RetrievingMessages(Tunnel),
Ingesting(MessagesEvent),
}
pub struct Actor {
state: Option<State>,
shared: Arc<SharedState>,
io: ActorIO<Self>,
}
impl Actor {
async fn tick(mut self) -> Option<Self> {
let state = match self.state.take()? {
State::WaitingForNotification => {
let notification = wait!(self.io.event_rx, Event::Notification(n) => n);
State::RetrievingMessages(notification.tunnel)
}
State::RetrievingMessages(tunnel) => {
self.io
.send(Request::Messages {
tunnel,
timestamps: self
.timestamps
.read()
.await
.iter()
.map(|(&k, &v)| (k, v))
.collect(),
})
.await
.ok();
State::Ingesting(wait!(self.io.event_rx, Event::Messages(event) => event))
}
State::Ingesting(event) => {
let count = event.messages.len();
dbg!(&event.messages);
for op in event.messages {
let fut = self.receive_crdt_operation(op);
fut.await;
}
println!("Ingested {count} messages!");
match event.has_more {
true => State::RetrievingMessages(event.tunnel),
false => State::WaitingForNotification,
}
}
};
Some(Self {
state: Some(state),
..self
})
}
pub fn spawn(shared: Arc<SharedState>) -> SplitHandlerIO<Self> {
let (actor_io, handler_io) = create_actor_io::<Self>(|event_tx| Handler { event_tx });
tokio::spawn(async move {
let mut this = Self {
state: Some(Default::default()),
io: actor_io,
shared,
};
loop {
this = match this.tick().await {
Some(this) => this,
None => break,
};
}
});
handler_io.split()
}
async fn receive_crdt_operation(&mut self, op: CRDTOperation) {
self.clock
.update_with_timestamp(&Timestamp::new(op.timestamp, op.instance.into()))
.ok();
let mut timestamp = {
let mut clocks = self.timestamps.write().await;
clocks
.entry(op.instance)
.or_insert_with(|| op.timestamp)
.clone()
};
if timestamp < op.timestamp {
timestamp = op.timestamp;
}
let op_instance = op.instance;
let is_old = self.compare_message(&op).await;
if !is_old {
self.apply_op(op).await.ok();
}
self.db
._transaction()
.run({
let timestamps = self.timestamps.clone();
|db| async move {
match db
.instance()
.update(
instance::pub_id::equals(uuid_to_bytes(op_instance)),
vec![instance::timestamp::set(Some(timestamp.as_u64() as i64))],
)
.exec()
.await
{
Ok(_) => {
timestamps.write().await.insert(op_instance, timestamp);
Ok(())
}
Err(e) => Err(e),
}
}
})
.await
.unwrap();
}
async fn apply_op(&mut self, op: CRDTOperation) -> prisma_client_rust::Result<()> {
ModelSyncData::from_op(op.typ.clone())
.unwrap()
.exec(&self.db)
.await?;
match &op.typ {
CRDTOperationType::Shared(shared_op) => {
shared_op_db(&op, shared_op)
.to_query(&self.db)
.exec()
.await?;
}
CRDTOperationType::Relation(relation_op) => {
relation_op_db(&op, relation_op)
.to_query(&self.db)
.exec()
.await?;
}
}
self.io.req_tx.send(Request::Ingested).await.ok();
Ok(())
}
async fn compare_message(&mut self, op: &CRDTOperation) -> bool {
let old_timestamp = match &op.typ {
CRDTOperationType::Shared(shared_op) => {
let newer_op = self
.db
.shared_operation()
.find_first(vec![
shared_operation::timestamp::gte(op.timestamp.as_u64() as i64),
shared_operation::model::equals(shared_op.model.to_string()),
shared_operation::record_id::equals(
serde_json::to_vec(&shared_op.record_id).unwrap(),
),
shared_operation::kind::equals(shared_op.kind().to_string()),
])
.order_by(shared_operation::timestamp::order(SortOrder::Desc))
.exec()
.await
.unwrap();
newer_op.map(|newer_op| newer_op.timestamp)
}
CRDTOperationType::Relation(relation_op) => {
let newer_op = self
.db
.relation_operation()
.find_first(vec![
relation_operation::timestamp::gte(op.timestamp.as_u64() as i64),
relation_operation::relation::equals(relation_op.relation.to_string()),
relation_operation::item_id::equals(
serde_json::to_vec(&relation_op.relation_item).unwrap(),
),
relation_operation::kind::equals(relation_op.kind().to_string()),
])
.order_by(relation_operation::timestamp::order(SortOrder::Desc))
.exec()
.await
.unwrap();
newer_op.map(|newer_op| newer_op.timestamp)
}
};
old_timestamp
.map(|old| old != op.timestamp.as_u64() as i64)
.unwrap_or_default()
}
}
impl Deref for Actor {
type Target = SharedState;
fn deref(&self) -> &Self::Target {
&self.shared
}
}
pub struct Handler {
pub event_tx: mpsc::Sender<Event>,
}
#[derive(Debug)]
pub struct MessagesEvent {
pub instance_id: Uuid,
pub messages: Vec<CRDTOperation>,
pub has_more: bool,
pub tunnel: Tunnel,
}
#[derive(Debug)]
@ -36,91 +263,39 @@ pub struct NotificationEvent {
pub tunnel: Tunnel,
}
#[derive(Debug)]
pub enum State {
WaitingForNotification,
ExecutingMessagesRequest,
Ingesting,
impl ActorTypes for Actor {
type Event = Event;
type Request = Request;
type Handler = Handler;
}
#[macro_export]
macro_rules! wait {
($rx:ident, $pattern:pat $(=> $expr:expr)?) => {
loop {
match $rx.recv().await {
Some($pattern) => break $($expr)?,
_ => continue
}
}
};
}
impl Actor {
pub fn spawn(timestamps: Timestamps) -> (Self, mpsc::Receiver<Request>) {
let (req_tx, req_rx) = mpsc::channel(4);
let (events_tx, mut events_rx) = mpsc::channel(4);
tokio::spawn(async move {
let mut state = State::WaitingForNotification;
loop {
dbg!(&state);
state = match state {
State::WaitingForNotification => {
let notification = wait!(events_rx, Event::Notification(n) => n);
// req_tx.send(Request::Messages(tunnel, peer_id, 69));
// let notification = wait!(
// events_rx,
// Incoming::Notification(notification) => notification
// );
req_tx
.send(Request::Messages {
tunnel: notification.tunnel,
timestamps: timestamps
.read()
.await
.iter()
.map(|(&k, &v)| (k, v))
.collect(),
})
.await
.ok();
State::ExecutingMessagesRequest
}
State::ExecutingMessagesRequest => {
let event = wait!(events_rx, Event::Messages(event) => event);
req_tx
.send(Request::Ingest(event.messages.clone()))
.await
.ok();
dbg!(&event.messages);
State::Ingesting
}
State::Ingesting => {
println!("Ingested!");
State::WaitingForNotification
}
};
}
});
(Self { events: events_tx }, req_rx)
fn shared_op_db(op: &CRDTOperation, shared_op: &SharedOperation) -> shared_operation::Create {
shared_operation::Create {
id: op.id.as_bytes().to_vec(),
timestamp: op.timestamp.0 as i64,
instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()),
kind: shared_op.kind().to_string(),
data: to_vec(&shared_op.data).unwrap(),
model: shared_op.model.to_string(),
record_id: to_vec(&shared_op.record_id).unwrap(),
_params: vec![],
}
}
pub async fn notify(&self, tunnel: Tunnel, _peer_id: PeerId) {
self.events
.send(Event::Notification(NotificationEvent { tunnel }))
.await
.ok();
fn relation_op_db(
op: &CRDTOperation,
relation_op: &RelationOperation,
) -> relation_operation::Create {
relation_operation::Create {
id: op.id.as_bytes().to_vec(),
timestamp: op.timestamp.0 as i64,
instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()),
kind: relation_op.kind().to_string(),
data: to_vec(&relation_op.data).unwrap(),
relation: relation_op.relation.to_string(),
item_id: to_vec(&relation_op.relation_item).unwrap(),
group_id: to_vec(&relation_op.relation_group).unwrap(),
_params: vec![],
}
}

View file

@ -1,10 +1,12 @@
#![allow(clippy::unwrap_used, clippy::panic)] // TODO: Brendan remove this once you've got error handling here
mod actor;
mod db_operation;
pub mod ingest;
mod manager;
use sd_prisma::{prisma::*, prisma_sync::ModelSyncData};
use sd_prisma::prisma::*;
use sd_sync::*;
use sd_utils::uuid_to_bytes;
use std::{
collections::{BTreeMap, HashMap},
@ -12,15 +14,8 @@ use std::{
sync::Arc,
};
use serde_json::to_vec;
use tokio::sync::{
broadcast::{self},
mpsc, RwLock,
};
use uhlc::{HLCBuilder, Timestamp, HLC};
use uuid::Uuid;
pub use sd_prisma::prisma_sync;
pub use ingest::*;
pub use manager::*;
pub use uhlc::NTP64;
#[derive(Clone)]
@ -29,367 +24,29 @@ pub enum SyncMessage {
Created,
}
pub type Timestamps = Arc<RwLock<HashMap<Uuid, NTP64>>>;
pub type Timestamps = Arc<tokio::sync::RwLock<HashMap<uuid::Uuid, NTP64>>>;
pub struct SyncManager {
db: Arc<PrismaClient>,
pub instance: Uuid,
// TODO: Remove `Mutex` and store this on `ingest` actor
timestamps: Timestamps,
clock: HLC,
pub tx: broadcast::Sender<SyncMessage>,
pub ingest: ingest::Actor,
pub struct SharedState {
pub db: Arc<PrismaClient>,
pub instance: uuid::Uuid,
pub timestamps: Timestamps,
pub clock: uhlc::HLC,
}
impl fmt::Debug for SyncManager {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SyncManager").finish()
}
}
pub struct SyncManagerNew {
pub manager: SyncManager,
pub rx: broadcast::Receiver<SyncMessage>,
pub ingest_rx: mpsc::Receiver<ingest::Request>,
}
impl SyncManager {
#[allow(clippy::new_ret_no_self)]
pub fn new(db: &Arc<PrismaClient>, instance: Uuid) -> SyncManagerNew {
let (tx, rx) = broadcast::channel(64);
let timestamps: Timestamps = Default::default();
let (ingest, ingest_rx) = ingest::Actor::spawn(timestamps.clone());
SyncManagerNew {
manager: Self {
db: db.clone(),
instance,
clock: HLCBuilder::new().with_id(instance.into()).build(),
timestamps,
tx,
ingest,
},
rx,
ingest_rx,
}
}
pub async fn write_ops<'item, I: prisma_client_rust::BatchItem<'item>>(
&self,
tx: &PrismaClient,
(_ops, queries): (Vec<CRDTOperation>, I),
) -> prisma_client_rust::Result<<I as prisma_client_rust::BatchItemParent>::ReturnValue> {
#[cfg(feature = "emit-messages")]
let res = {
macro_rules! variant {
($var:ident, $variant:ident, $fn:ident) => {
let $var = _ops
.iter()
.filter_map(|op| match &op.typ {
CRDTOperationType::$variant(inner) => {
Some($fn(&op, &inner).to_query(tx))
}
_ => None,
})
.collect::<Vec<_>>();
};
}
variant!(shared, Shared, shared_op_db);
variant!(relation, Relation, relation_op_db);
let (res, _) = tx._batch((queries, (shared, relation))).await?;
self.tx.send(SyncMessage::Created).ok();
res
};
#[cfg(not(feature = "emit-messages"))]
let res = tx._batch([queries]).await?.remove(0);
Ok(res)
}
#[allow(unused_variables)]
pub async fn write_op<'item, Q: prisma_client_rust::BatchItem<'item>>(
&self,
tx: &PrismaClient,
op: CRDTOperation,
query: Q,
) -> prisma_client_rust::Result<<Q as prisma_client_rust::BatchItemParent>::ReturnValue> {
#[cfg(feature = "emit-messages")]
let ret = {
macro_rules! exec {
($fn:ident, $inner:ident) => {
tx._batch(($fn(&op, $inner).to_query(tx), query)).await?.1
};
}
let ret = match &op.typ {
CRDTOperationType::Shared(inner) => exec!(shared_op_db, inner),
CRDTOperationType::Relation(inner) => exec!(relation_op_db, inner),
};
self.tx.send(SyncMessage::Created).ok();
ret
};
#[cfg(not(feature = "emit-messages"))]
let ret = tx._batch(vec![query]).await?.remove(0);
Ok(ret)
}
pub async fn get_ops(
&self,
args: GetOpsArgs,
) -> prisma_client_rust::Result<Vec<CRDTOperation>> {
let Self { db, .. } = self;
shared_operation::include!(shared_include {
instance: select { pub_id }
});
relation_operation::include!(relation_include {
instance: select { pub_id }
});
enum DbOperation {
Shared(shared_include::Data),
Relation(relation_include::Data),
}
impl DbOperation {
fn timestamp(&self) -> NTP64 {
NTP64(match self {
Self::Shared(op) => op.timestamp,
Self::Relation(op) => op.timestamp,
} as u64)
}
fn id(&self) -> Uuid {
Uuid::from_slice(match self {
Self::Shared(op) => &op.id,
Self::Relation(op) => &op.id,
})
.unwrap()
}
fn instance(&self) -> Uuid {
Uuid::from_slice(match self {
Self::Shared(op) => &op.instance.pub_id,
Self::Relation(op) => &op.instance.pub_id,
})
.unwrap()
}
fn into_operation(self) -> CRDTOperation {
CRDTOperation {
id: self.id(),
instance: self.instance(),
timestamp: self.timestamp(),
typ: match self {
Self::Shared(op) => CRDTOperationType::Shared(SharedOperation {
record_id: serde_json::from_slice(&op.record_id).unwrap(),
model: op.model,
data: serde_json::from_slice(&op.data).unwrap(),
}),
Self::Relation(op) => CRDTOperationType::Relation(RelationOperation {
relation: op.relation,
data: serde_json::from_slice(&op.data).unwrap(),
relation_item: serde_json::from_slice(&op.item_id).unwrap(),
relation_group: serde_json::from_slice(&op.group_id).unwrap(),
}),
},
}
}
}
macro_rules! db_args {
($op:ident) => {
vec![prisma_client_rust::operator::or(
args.clocks
.iter()
.map(|(instance_id, timestamp)| {
prisma_client_rust::and![
$op::instance::is(vec![instance::pub_id::equals(uuid_to_bytes(
*instance_id
))]),
$op::timestamp::gte(timestamp.as_u64() as i64)
]
})
.collect(),
)]
};
}
let (shared, relation) = db
._batch((
db.shared_operation()
.find_many(db_args!(shared_operation))
.take(args.count as i64)
.include(shared_include::include()),
db.relation_operation()
.find_many(db_args!(relation_operation))
.take(args.count as i64)
.include(relation_include::include()),
))
.await?;
let mut ops = BTreeMap::new();
ops.extend(
shared
.into_iter()
.map(DbOperation::Shared)
.map(|op| (op.timestamp(), op)),
);
ops.extend(
relation
.into_iter()
.map(DbOperation::Relation)
.map(|op| (op.timestamp(), op)),
);
Ok(ops
.into_values()
.rev()
.take(args.count as usize)
.map(DbOperation::into_operation)
.collect())
}
pub async fn apply_op(&self, op: CRDTOperation) -> prisma_client_rust::Result<()> {
ModelSyncData::from_op(op.typ.clone())
.unwrap()
.exec(&self.db)
.await?;
match &op.typ {
CRDTOperationType::Shared(shared_op) => {
shared_op_db(&op, shared_op)
.to_query(&self.db)
.exec()
.await?;
}
CRDTOperationType::Relation(relation_op) => {
relation_op_db(&op, relation_op)
.to_query(&self.db)
.exec()
.await?;
}
}
self.tx.send(SyncMessage::Ingested).ok();
Ok(())
}
async fn compare_message(&self, op: &CRDTOperation) -> bool {
let old_timestamp = match &op.typ {
CRDTOperationType::Shared(shared_op) => {
let newer_op = self
.db
.shared_operation()
.find_first(vec![
shared_operation::timestamp::gte(op.timestamp.as_u64() as i64),
shared_operation::model::equals(shared_op.model.to_string()),
shared_operation::record_id::equals(
serde_json::to_vec(&shared_op.record_id).unwrap(),
),
shared_operation::kind::equals(shared_op.kind().to_string()),
])
.order_by(shared_operation::timestamp::order(SortOrder::Desc))
.exec()
.await
.unwrap();
newer_op.map(|newer_op| newer_op.timestamp)
}
CRDTOperationType::Relation(relation_op) => {
let newer_op = self
.db
.relation_operation()
.find_first(vec![
relation_operation::timestamp::gte(op.timestamp.as_u64() as i64),
relation_operation::relation::equals(relation_op.relation.to_string()),
relation_operation::item_id::equals(
serde_json::to_vec(&relation_op.relation_item).unwrap(),
),
relation_operation::kind::equals(relation_op.kind().to_string()),
])
.order_by(relation_operation::timestamp::order(SortOrder::Desc))
.exec()
.await
.unwrap();
newer_op.map(|newer_op| newer_op.timestamp)
}
};
old_timestamp
.map(|old| old != op.timestamp.as_u64() as i64)
.unwrap_or_default()
}
pub async fn receive_crdt_operation(&self, op: CRDTOperation) {
self.clock
.update_with_timestamp(&Timestamp::new(op.timestamp, op.instance.into()))
.ok();
let mut clocks = self.timestamps.write().await;
let timestamp = clocks.entry(op.instance).or_insert_with(|| op.timestamp);
if *timestamp < op.timestamp {
*timestamp = op.timestamp;
}
let op_timestamp = op.timestamp;
let op_instance = op.instance;
let is_old = self.compare_message(&op).await;
if !is_old {
self.apply_op(op).await.ok();
}
self.db
.instance()
.update(
instance::pub_id::equals(uuid_to_bytes(op_instance)),
vec![instance::timestamp::set(Some(op_timestamp.as_u64() as i64))],
)
.exec()
.await
.ok();
}
pub async fn register_instance(&self, instance_id: Uuid) {
self.timestamps.write().await.insert(instance_id, NTP64(0));
}
}
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
pub struct GetOpsArgs {
pub clocks: Vec<(Uuid, NTP64)>,
pub count: u32,
}
fn shared_op_db(op: &CRDTOperation, shared_op: &SharedOperation) -> shared_operation::Create {
pub fn shared_op_db(op: &CRDTOperation, shared_op: &SharedOperation) -> shared_operation::Create {
shared_operation::Create {
id: op.id.as_bytes().to_vec(),
timestamp: op.timestamp.0 as i64,
instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()),
kind: shared_op.kind().to_string(),
data: to_vec(&shared_op.data).unwrap(),
data: serde_json::to_vec(&shared_op.data).unwrap(),
model: shared_op.model.to_string(),
record_id: to_vec(&shared_op.record_id).unwrap(),
record_id: serde_json::to_vec(&shared_op.record_id).unwrap(),
_params: vec![],
}
}
fn relation_op_db(
pub fn relation_op_db(
op: &CRDTOperation,
relation_op: &RelationOperation,
) -> relation_operation::Create {
@ -398,20 +55,10 @@ fn relation_op_db(
timestamp: op.timestamp.0 as i64,
instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()),
kind: relation_op.kind().to_string(),
data: to_vec(&relation_op.data).unwrap(),
data: serde_json::to_vec(&relation_op.data).unwrap(),
relation: relation_op.relation.to_string(),
item_id: to_vec(&relation_op.relation_item).unwrap(),
group_id: to_vec(&relation_op.relation_group).unwrap(),
item_id: serde_json::to_vec(&relation_op.relation_item).unwrap(),
group_id: serde_json::to_vec(&relation_op.relation_group).unwrap(),
_params: vec![],
}
}
impl OperationFactory for SyncManager {
fn get_clock(&self) -> &HLC {
&self.clock
}
fn get_instance(&self) -> Uuid {
self.instance
}
}

View file

@ -0,0 +1,206 @@
use sd_prisma::prisma::*;
use sd_sync::*;
use sd_utils::uuid_to_bytes;
use crate::{db_operation::*, *};
use std::{cmp::Ordering, ops::Deref, sync::Arc};
use tokio::sync::{broadcast, mpsc};
use uhlc::{HLCBuilder, HLC};
use uuid::Uuid;
pub struct Manager {
pub tx: broadcast::Sender<SyncMessage>,
pub ingest: ingest::Handler,
shared: Arc<SharedState>,
}
pub struct SyncManagerNew {
pub manager: Manager,
pub rx: broadcast::Receiver<SyncMessage>,
pub ingest_rx: mpsc::Receiver<ingest::Request>,
}
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
pub struct GetOpsArgs {
pub clocks: Vec<(Uuid, NTP64)>,
pub count: u32,
}
impl Manager {
pub fn new(db: &Arc<PrismaClient>, instance: Uuid) -> SyncManagerNew {
let (tx, rx) = broadcast::channel(64);
let timestamps: Timestamps = Default::default();
let clock = HLCBuilder::new().with_id(instance.into()).build();
let shared = Arc::new(SharedState {
db: db.clone(),
instance,
timestamps,
clock,
});
let (ingest, ingest_rx) = ingest::Actor::spawn(shared.clone());
SyncManagerNew {
manager: Self { shared, tx, ingest },
rx,
ingest_rx,
}
}
pub async fn write_ops<'item, I: prisma_client_rust::BatchItem<'item>>(
&self,
tx: &PrismaClient,
(_ops, queries): (Vec<CRDTOperation>, I),
) -> prisma_client_rust::Result<<I as prisma_client_rust::BatchItemParent>::ReturnValue> {
#[cfg(feature = "emit-messages")]
let res = {
macro_rules! variant {
($var:ident, $variant:ident, $fn:ident) => {
let $var = _ops
.iter()
.filter_map(|op| match &op.typ {
CRDTOperationType::$variant(inner) => {
Some($fn(&op, &inner).to_query(tx))
}
_ => None,
})
.collect::<Vec<_>>();
};
}
variant!(shared, Shared, shared_op_db);
variant!(relation, Relation, relation_op_db);
let (res, _) = tx._batch((queries, (shared, relation))).await?;
self.tx.send(SyncMessage::Created).ok();
res
};
#[cfg(not(feature = "emit-messages"))]
let res = tx._batch([queries]).await?.remove(0);
Ok(res)
}
#[allow(unused_variables)]
pub async fn write_op<'item, Q: prisma_client_rust::BatchItem<'item>>(
&self,
tx: &PrismaClient,
op: CRDTOperation,
query: Q,
) -> prisma_client_rust::Result<<Q as prisma_client_rust::BatchItemParent>::ReturnValue> {
#[cfg(feature = "emit-messages")]
let ret = {
macro_rules! exec {
($fn:ident, $inner:ident) => {
tx._batch(($fn(&op, $inner).to_query(tx), query)).await?.1
};
}
let ret = match &op.typ {
CRDTOperationType::Shared(inner) => exec!(shared_op_db, inner),
CRDTOperationType::Relation(inner) => exec!(relation_op_db, inner),
};
self.tx.send(SyncMessage::Created).ok();
ret
};
#[cfg(not(feature = "emit-messages"))]
let ret = tx._batch(vec![query]).await?.remove(0);
Ok(ret)
}
pub async fn get_ops(
&self,
args: GetOpsArgs,
) -> prisma_client_rust::Result<Vec<CRDTOperation>> {
let db = &self.db;
macro_rules! db_args {
($args:ident, $op:ident) => {
vec![prisma_client_rust::operator::or(
$args
.clocks
.iter()
.map(|(instance_id, timestamp)| {
prisma_client_rust::and![
$op::instance::is(vec![instance::pub_id::equals(uuid_to_bytes(
*instance_id
))]),
$op::timestamp::gt(timestamp.as_u64() as i64)
]
})
.chain([
$op::instance::is_not(vec![
instance::pub_id::in_vec(
$args
.clocks
.iter()
.map(|(instance_id, _)| {
uuid_to_bytes(*instance_id)
})
.collect()
)
])
])
.collect(),
)]
};
}
let (shared, relation) = db
._batch((
db.shared_operation()
.find_many(db_args!(args, shared_operation))
.take(args.count as i64)
.order_by(shared_operation::timestamp::order(SortOrder::Asc))
.include(shared_include::include()),
db.relation_operation()
.find_many(db_args!(args, relation_operation))
.take(args.count as i64)
.order_by(relation_operation::timestamp::order(SortOrder::Asc))
.include(relation_include::include()),
))
.await?;
let mut ops: Vec<_> = []
.into_iter()
.chain(shared.into_iter().map(DbOperation::Shared))
.chain(relation.into_iter().map(DbOperation::Relation))
.collect();
ops.sort_by(|a, b| match a.timestamp().cmp(&b.timestamp()) {
Ordering::Equal => a.instance().cmp(&b.instance()),
o => o,
});
Ok(ops
.into_iter()
.take(args.count as usize)
.map(DbOperation::into_operation)
.collect())
}
}
impl OperationFactory for Manager {
fn get_clock(&self) -> &HLC {
&self.clock
}
fn get_instance(&self) -> Uuid {
self.instance
}
}
impl Deref for Manager {
type Target = SharedState;
fn deref(&self) -> &Self::Target {
&self.shared
}
}

View file

@ -18,7 +18,7 @@ struct Instance {
id: Uuid,
_peer_id: sd_p2p::PeerId,
db: Arc<prisma::PrismaClient>,
sync: Arc<SyncManager>,
sync: Arc<sd_core_sync::Manager>,
}
impl Instance {
@ -54,7 +54,7 @@ impl Instance {
.await
.unwrap();
let sync = sd_core_sync::SyncManager::new(&db, id);
let sync = sd_core_sync::Manager::new(&db, id);
(
Arc::new(Self {
@ -105,9 +105,6 @@ impl Instance {
.exec()
.await
.unwrap();
left.sync.register_instance(right.id).await;
right.sync.register_instance(left.id).await;
}
}
@ -125,15 +122,9 @@ async fn bruh() -> Result<(), Box<dyn std::error::Error>> {
async move {
while let Ok(msg) = sync_rx1.recv().await {
match msg {
SyncMessage::Created => instance2
.sync
.ingest
.events
.send(ingest::Event::Notification(ingest::NotificationEvent {
tunnel: todo!(),
}))
.await
.unwrap(),
SyncMessage::Created => {
instance2.sync.ingest.event_tx.send(todo!()).await.unwrap()
}
_ => {}
}
}
@ -160,9 +151,11 @@ async fn bruh() -> Result<(), Box<dyn std::error::Error>> {
instance2
.sync
.ingest
.events
.event_tx
.send(ingest::Event::Messages(ingest::MessagesEvent {
tunnel: todo!(),
messages,
has_more: false,
instance_id: instance1.id,
}))
.await

View file

@ -6,7 +6,7 @@ datasource db {
generator client {
provider = "cargo prisma"
output = "../../crates/prisma/src/prisma"
module_path = "sd_prisma::prisma"
module_path = "prisma"
client_format = "folder"
}
@ -91,6 +91,7 @@ model Instance {
SharedOperation SharedOperation[]
RelationOperation RelationOperation[]
locations Location[]
@@map("instance")
}
@ -141,7 +142,7 @@ model Location {
date_created DateTime?
instance_id Int?
// instance Instance? @relation(fields: [instance_id], references: [id]) // TODO: Enabling this breaks migration 7's `update_many` in `library/config.rs`
instance Instance? @relation(fields: [instance_id], references: [id], onDelete: SetNull)
file_paths FilePath[]
indexer_rules IndexerRulesInLocation[]

View file

@ -46,6 +46,8 @@ pub(crate) mod preferences;
pub mod util;
pub(crate) mod volume;
pub(crate) use sd_core_sync as sync;
/// Holds references to all the services that make up the Spacedrive core.
/// This can easily be passed around as a context to the rest of the core.
pub struct NodeServices {

View file

@ -6,6 +6,7 @@ use crate::{
location::file_path_helper::{file_path_to_full_path, IsolatedFilePathData},
object::{orphan_remover::OrphanRemoverActor, preview::get_thumbnail_path},
prisma::{file_path, location, PrismaClient},
sync,
util::{db::maybe_missing, error::FileIOError},
Node, NotificationManager,
};
@ -18,7 +19,7 @@ use std::{
};
use chrono::{DateTime, Utc};
use sd_core_sync::{SyncManager, SyncMessage};
use sd_core_sync::SyncMessage;
use sd_p2p::spacetunnel::Identity;
use sd_prisma::prisma::notification;
use tokio::{fs, io, sync::broadcast};
@ -42,7 +43,7 @@ pub struct LoadedLibrary {
pub config: LibraryConfig,
/// db holds the database client for the current library.
pub db: Arc<PrismaClient>,
pub sync: Arc<sd_core_sync::SyncManager>,
pub sync: Arc<sync::Manager>,
/// key manager that provides encryption keys to functions that require them
// pub key_manager: Arc<KeyManager>,
/// p2p identity
@ -78,7 +79,7 @@ impl LoadedLibrary {
manager: Arc<LibraryManager>,
node: &Arc<Node>,
) -> Arc<Self> {
let mut sync = SyncManager::new(&db, instance_id);
let mut sync = sync::Manager::new(&db, instance_id);
let library = Arc::new(Self {
id,
@ -106,24 +107,31 @@ impl LoadedLibrary {
loop {
tokio::select! {
req = sync.ingest_rx.recv() => {
use sd_core_sync::ingest::Request;
use sd_core_sync::ingest;
let Some(req) = req else { continue; };
const OPS_PER_REQUEST: u32 = 100;
match req {
Request::Messages { tunnel, timestamps } => {
node.nlm.request_and_ingest_ops(
tunnel,
sd_core_sync::GetOpsArgs { clocks: timestamps, count: 100 },
&library.sync,
library.id
ingest::Request::Messages { mut tunnel, timestamps } => {
let ops = library.node().nlm.request_ops(
&mut tunnel,
sd_core_sync::GetOpsArgs { clocks: timestamps, count: OPS_PER_REQUEST },
).await;
library.sync.ingest
.event_tx
.send(ingest::Event::Messages(ingest::MessagesEvent {
tunnel,
instance_id: library.sync.instance,
has_more: ops.len() == OPS_PER_REQUEST as usize,
messages: ops,
}))
.await
.expect("TODO: Handle ingest channel closed, so we don't loose ops");
},
Request::Ingest(ops) => {
for op in ops.into_iter() {
library.sync.receive_crdt_operation(op).await;
}
}
_ => {}
}
},
msg = sync.rx.recv() => {

View file

@ -110,7 +110,7 @@ async fn execute_indexer_save_step(
(
location::NAME,
json!(prisma_sync::location::SyncId {
pub_id: pub_id.clone()
pub_id: location.pub_id.clone()
}),
),
location_id::set(Some(location.id)),

View file

@ -24,6 +24,7 @@ use normpath::PathExt;
use prisma_client_rust::{operator::and, or, QueryError};
use sd_prisma::prisma_sync;
use sd_sync::*;
use sd_utils::uuid_to_bytes;
use serde::Deserialize;
use serde_json::json;
use specta::Type;
@ -600,10 +601,9 @@ async fn create_location(
(location::path::NAME, json!(&location_path)),
(location::date_created::NAME, json!(date_created)),
(
location::instance_id::NAME,
location::instance::NAME,
json!(prisma_sync::instance::SyncId {
pub_id: vec![],
// id: library.config.instance_id,
pub_id: uuid_to_bytes(library.sync.instance)
}),
),
],
@ -741,7 +741,7 @@ impl From<location_with_indexer_rules::Data> for location::Data {
date_created: data.date_created,
file_paths: None,
indexer_rules: None,
// instance: None,
instance: None,
}
}
}
@ -763,7 +763,7 @@ impl From<&location_with_indexer_rules::Data> for location::Data {
date_created: data.date_created,
file_paths: None,
indexer_rules: None,
// instance: None,
instance: None,
}
}
}

View file

@ -9,7 +9,6 @@ use crate::{
util::{db::maybe_missing, error::FileIOError},
};
use sd_core_sync::SyncManager;
use sd_file_ext::{extensions::Extension, kind::ObjectKind};
use sd_prisma::prisma_sync;
use sd_sync::{CRDTOperation, OperationFactory};
@ -308,7 +307,7 @@ async fn identifier_job_step(
fn file_path_object_connect_ops<'db>(
file_path_id: Uuid,
object_id: Uuid,
sync: &SyncManager,
sync: &crate::sync::Manager,
db: &'db PrismaClient,
) -> (CRDTOperation, file_path::UpdateQuery<'db>) {
#[cfg(debug_assertions)]

View file

@ -259,6 +259,7 @@ impl P2PManager {
}
Header::Sync(library_id) => {
// Header -> Tunnel -> SyncMessage
use sd_core_sync::ingest;
let mut tunnel = Tunnel::responder(stream).await.unwrap();
@ -276,16 +277,16 @@ impl P2PManager {
SyncMessage::NewOperations => {
// The ends up in `NetworkedLibraryManager::request_and_ingest_ops`.
// TODO: Throw tunnel around like this makes it soooo confusing.
ingest.notify(tunnel, event.peer_id).await;
ingest
.event_tx
.send(ingest::Event::Notification(
ingest::NotificationEvent { tunnel },
))
.await
.ok();
}
SyncMessage::OperationsRequest(_) => {
nlm.exchange_sync_ops(
tunnel,
&event.peer_id,
library_id,
&library.sync,
)
.await;
todo!("this should be received somewhere else!");
}
SyncMessage::OperationsRequestResponse(_) => {
todo!("unreachable but add proper error handling")

View file

@ -1,376 +0,0 @@
use sd_prisma::prisma::*;
// TODO: Turn this entire file into a Prisma generator cause it could be way more maintainable
// Pairing will fail if the two clients aren't on versions with identical DB models so it's safe to send them and ignore migrations.
const ITEMS_PER_BATCH: i64 = 1000;
macro_rules! impl_for_models {
($($variant:ident($model:ident)),* $(,)+) => {
/// Represents any DB model to be ingested into the database as part of the initial sync
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum ModelData {
$(
$variant(Vec<$model::Data>),
)*
}
impl ModelData {
/// Length of data
pub fn len(&self) -> usize {
match self {
$(
Self::$variant(data) => data.len(),
)*
}
}
/// Get count of all of the rows in the database
pub async fn total_count(db: &PrismaClient) -> Result<i64, prisma_client_rust::QueryError> {
let mut total_count = 0;
let ($( $model ),*) = tokio::join!(
$(
db.$model().count(vec![]).exec(),
)*
);
$(total_count += $model?;)*
Ok(total_count)
}
/// Insert the data into the database
pub async fn insert(self, db: &PrismaClient) -> Result<(), prisma_client_rust::QueryError> {
match self {
$(
Self::$variant(data) => {
// TODO: Prisma Client Rust is broke
// db.$model().create_many(data.into_iter().map(|v| FromData(v).into()).collect()).exec().await?;
for i in data {
$model::CreateUnchecked::from(FromData(i)).to_query(db).exec().await?;
}
}
)*
}
Ok(())
}
pub fn model_name(&self) -> &'static str {
match self {
$(
Self::$variant(_) => stringify!($model),
)*
}
}
}
/// This exists to determine the next model to sync.
/// It emulates `.window()` functionality but for a `macro_rules`
// TODO: When replacing with a generator this can be removed and done at compile time
#[derive(Debug)]
enum ModelSyncCursorIterator {
Done = 0,
$(
$variant,
)*
}
impl<'a> From<&'a ModelSyncCursor> for ModelSyncCursorIterator {
fn from(cursor: &'a ModelSyncCursor) -> Self {
match cursor {
$(
ModelSyncCursor::$variant(_) => Self::$variant,
)*
ModelSyncCursor::Done => Self::Done,
}
}
}
impl ModelSyncCursorIterator {
pub fn next(self) -> ModelSyncCursor {
let i = self as i32;
match i + 1 {
$(
v if v == Self::$variant as i32 => ModelSyncCursor::$variant(0),
)*
_ => ModelSyncCursor::Done,
}
}
}
/// Represent where we ar eup to with the sync
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum ModelSyncCursor {
$(
$variant(i64),
)*
Done,
}
impl ModelSyncCursor {
pub fn new() -> Self {
new_impl!($( $variant ),*)
}
pub async fn next(&mut self, db: &PrismaClient) -> Option<Result<ModelData, prisma_client_rust::QueryError>> {
match self {
$(
Self::$variant(cursor) => {
match db.$model()
.find_many(vec![])
.skip(*cursor)
.take(ITEMS_PER_BATCH + 1)
.exec()
.await {
Ok(data) => {
if data.len() <= ITEMS_PER_BATCH as usize {
*self = ModelSyncCursorIterator::from(&*self).next();
} else {
*self = Self::$variant(*cursor + ITEMS_PER_BATCH);
}
Some(Ok(ModelData::$variant(data)))
},
Err(e) => return Some(Err(e)),
}
},
)*
Self::Done => None
}
}
}
};
}
macro_rules! new_impl {
($x:ident, $($y:ident),+) => {
Self::$x(0)
};
}
impl PartialEq for ModelData {
// Crude EQ impl based only on ID's not struct content.
// It's super annoying PCR does have this impl but it kinda makes sense with relation fetching.
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(ModelData::SharedOperation(a), ModelData::SharedOperation(b)) => a
.iter()
.map(|x| x.id.clone())
.eq(b.iter().map(|x| x.id.clone())),
(ModelData::Volume(a), ModelData::Volume(b)) => {
a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id))
}
(ModelData::Location(a), ModelData::Location(b)) => {
a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id))
}
(ModelData::FilePath(a), ModelData::FilePath(b)) => {
a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id))
}
(ModelData::Object(a), ModelData::Object(b)) => {
a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id))
}
(ModelData::Tag(a), ModelData::Tag(b)) => {
a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id))
}
(ModelData::TagOnObject(a), ModelData::TagOnObject(b)) => a
.iter()
.map(|x| (x.tag_id, x.object_id))
.eq(b.iter().map(|x| (x.tag_id, x.object_id))),
(ModelData::IndexerRule(a), ModelData::IndexerRule(b)) => {
a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id))
}
(ModelData::IndexerRulesInLocation(a), ModelData::IndexerRulesInLocation(b)) => a
.iter()
.map(|x| (x.location_id, x.indexer_rule_id))
.eq(b.iter().map(|x| (x.location_id, x.indexer_rule_id))),
(ModelData::Preference(a), ModelData::Preference(b)) => a
.iter()
.map(|x| (x.key.clone(), x.value.clone()))
.eq(b.iter().map(|x| (x.key.clone(), x.value.clone()))),
_ => false,
}
}
}
/// Meaningless wrapper to avoid Rust's orphan rule
struct FromData<T>(T);
impl From<FromData<shared_operation::Data>> for shared_operation::CreateUnchecked {
fn from(FromData(data): FromData<shared_operation::Data>) -> Self {
Self {
id: data.id,
timestamp: data.timestamp,
model: data.model,
record_id: data.record_id,
kind: data.kind,
data: data.data,
instance_id: data.instance_id,
_params: vec![],
}
}
}
impl From<FromData<volume::Data>> for volume::CreateUnchecked {
fn from(FromData(data): FromData<volume::Data>) -> Self {
Self {
name: data.name,
mount_point: data.mount_point,
_params: vec![
volume::id::set(data.id),
volume::total_bytes_capacity::set(data.total_bytes_capacity),
volume::total_bytes_available::set(data.total_bytes_available),
volume::disk_type::set(data.disk_type),
volume::filesystem::set(data.filesystem),
volume::is_system::set(data.is_system),
volume::date_modified::set(data.date_modified),
],
}
}
}
impl From<FromData<location::Data>> for location::CreateUnchecked {
fn from(FromData(data): FromData<location::Data>) -> Self {
Self {
pub_id: data.pub_id,
_params: vec![
location::id::set(data.id),
location::name::set(data.name),
location::path::set(data.path),
location::total_capacity::set(data.total_capacity),
location::available_capacity::set(data.available_capacity),
location::is_archived::set(data.is_archived),
location::generate_preview_media::set(data.generate_preview_media),
location::sync_preview_media::set(data.sync_preview_media),
location::hidden::set(data.hidden),
location::date_created::set(data.date_created),
location::instance_id::set(data.instance_id),
],
}
}
}
impl From<FromData<file_path::Data>> for file_path::CreateUnchecked {
fn from(FromData(data): FromData<file_path::Data>) -> Self {
Self {
pub_id: data.pub_id,
_params: vec![
file_path::id::set(data.id),
file_path::is_dir::set(data.is_dir),
file_path::cas_id::set(data.cas_id),
file_path::integrity_checksum::set(data.integrity_checksum),
file_path::location_id::set(data.location_id),
file_path::materialized_path::set(data.materialized_path),
file_path::name::set(data.name),
file_path::extension::set(data.extension),
file_path::size_in_bytes::set(data.size_in_bytes),
file_path::size_in_bytes_bytes::set(data.size_in_bytes_bytes),
file_path::inode::set(data.inode),
file_path::device::set(data.device),
file_path::object_id::set(data.object_id),
file_path::key_id::set(data.key_id),
file_path::date_created::set(data.date_created),
file_path::date_modified::set(data.date_modified),
file_path::date_indexed::set(data.date_indexed),
],
}
}
}
impl From<FromData<object::Data>> for object::CreateUnchecked {
fn from(FromData(data): FromData<object::Data>) -> Self {
Self {
pub_id: data.pub_id,
_params: vec![
object::id::set(data.id),
object::kind::set(data.kind),
object::key_id::set(data.key_id),
object::hidden::set(data.hidden),
object::favorite::set(data.favorite),
object::important::set(data.important),
object::note::set(data.note),
object::date_created::set(data.date_created),
object::date_accessed::set(data.date_accessed),
],
}
}
}
impl From<FromData<tag::Data>> for tag::CreateUnchecked {
fn from(FromData(data): FromData<tag::Data>) -> Self {
Self {
pub_id: data.pub_id,
_params: vec![
tag::id::set(data.id),
tag::name::set(data.name),
tag::color::set(data.color),
tag::redundancy_goal::set(data.redundancy_goal),
tag::date_created::set(data.date_created),
tag::date_modified::set(data.date_modified),
],
}
}
}
impl From<FromData<tag_on_object::Data>> for tag_on_object::CreateUnchecked {
fn from(FromData(data): FromData<tag_on_object::Data>) -> Self {
Self {
tag_id: data.tag_id,
object_id: data.object_id,
_params: vec![],
}
}
}
impl From<FromData<indexer_rule::Data>> for indexer_rule::CreateUnchecked {
fn from(FromData(data): FromData<indexer_rule::Data>) -> Self {
Self {
pub_id: data.pub_id,
_params: vec![
indexer_rule::id::set(data.id),
indexer_rule::name::set(data.name),
indexer_rule::default::set(data.default),
indexer_rule::rules_per_kind::set(data.rules_per_kind),
indexer_rule::date_created::set(data.date_created),
indexer_rule::date_modified::set(data.date_modified),
],
}
}
}
impl From<FromData<indexer_rules_in_location::Data>>
for indexer_rules_in_location::CreateUnchecked
{
fn from(FromData(data): FromData<indexer_rules_in_location::Data>) -> Self {
Self {
location_id: data.location_id,
indexer_rule_id: data.indexer_rule_id,
_params: vec![],
}
}
}
impl From<FromData<preference::Data>> for preference::CreateUnchecked {
fn from(FromData(data): FromData<preference::Data>) -> Self {
Self {
key: data.key,
_params: vec![preference::value::set(data.value)],
}
}
}
// Ensure you order the models to Foreign Keys are created before the models that reference them.
impl_for_models! {
Object(object),
SharedOperation(shared_operation),
Volume(volume),
Location(location),
FilePath(file_path),
Tag(tag),
TagOnObject(tag_on_object),
IndexerRule(indexer_rule),
IndexerRulesInLocation(indexer_rules_in_location),
Preference(preference),
}

View file

@ -22,10 +22,8 @@ use tokio::{
use tracing::{error, info};
use uuid::Uuid;
mod initial_sync;
mod proto;
pub use initial_sync::*;
use proto::*;
use crate::{
@ -79,7 +77,157 @@ impl PairingManager {
node_config: NodeConfig,
library_manager: Arc<LibraryManager>,
) -> u16 {
todo!();
// TODO: Timeout for max number of pairings in a time period
let pairing_id = self.id.fetch_add(1, Ordering::SeqCst);
self.emit_progress(pairing_id, PairingStatus::EstablishingConnection);
info!("Beginning pairing '{pairing_id}' as originator to remote peer '{peer_id}'");
tokio::spawn(async move {
let mut stream = self.manager.stream(peer_id).await.unwrap();
stream.write_all(&Header::Pair.to_bytes()).await.unwrap();
// TODO: Ensure both clients are on a compatible version cause Prisma model changes will cause issues
// 1. Create new instance for originator and send it to the responder
self.emit_progress(pairing_id, PairingStatus::PairingRequested);
let now = Utc::now();
let identity = Identity::new();
let self_instance_id = Uuid::new_v4();
let req = PairingRequest(Instance {
id: self_instance_id,
identity: identity.to_remote_identity(),
node_id: node_config.id,
node_name: node_config.name.clone(),
node_platform: Platform::current(),
last_seen: now,
date_created: now,
});
stream.write_all(&req.to_bytes()).await.unwrap();
// 2.
match PairingResponse::from_stream(&mut stream).await.unwrap() {
PairingResponse::Accepted {
library_id,
library_name,
library_description,
instances,
} => {
info!("Pairing '{pairing_id}' accepted by remote into library '{library_id}'");
// TODO: Log all instances and library info
self.emit_progress(
pairing_id,
PairingStatus::PairingInProgress {
library_name: library_name.clone(),
library_description: library_description.clone(),
},
);
// TODO: Future - Library in pairing state
// TODO: Create library
if library_manager
.get_all_libraries()
.await
.into_iter()
.find(|i| i.id == library_id)
.is_some()
{
self.emit_progress(pairing_id, PairingStatus::LibraryAlreadyExists);
// TODO: Properly handle this at a protocol level so the error is on both sides
return;
}
let (this, instances): (Vec<_>, Vec<_>) = instances
.into_iter()
.partition(|i| i.id == self_instance_id);
if this.len() != 1 {
todo!("error handling");
}
let this = this.first().expect("unreachable");
if this.identity != identity.to_remote_identity() {
todo!("error handling. Something went really wrong!");
}
let library = library_manager
.create_with_uuid(
library_id,
LibraryName::new(library_name).unwrap(),
library_description,
node_config,
false, // We will sync everything which will conflict with the seeded stuff
Some(instance::Create {
pub_id: this.id.as_bytes().to_vec(),
identity: IdentityOrRemoteIdentity::Identity(identity).to_bytes(),
node_id: this.node_id.as_bytes().to_vec(),
node_name: this.node_name.clone(), // TODO: Remove `clone`
node_platform: this.node_platform as i32,
last_seen: this.last_seen.into(),
date_created: this.date_created.into(),
// timestamp: Default::default(), // TODO: Source this properly!
_params: vec![],
}),
)
.await
.unwrap();
let library = library_manager.get_library(&library.id).await.unwrap();
library
.db
.instance()
.create_many(
instances
.into_iter()
.map(|i| {
instance::CreateUnchecked {
pub_id: i.id.as_bytes().to_vec(),
identity: IdentityOrRemoteIdentity::RemoteIdentity(
i.identity,
)
.to_bytes(),
node_id: i.node_id.as_bytes().to_vec(),
node_name: i.node_name,
node_platform: i.node_platform as i32,
last_seen: i.last_seen.into(),
date_created: i.date_created.into(),
// timestamp: Default::default(), // TODO: Source this properly!
_params: vec![],
}
})
.collect(),
)
.exec()
.await
.unwrap();
// Called again so the new instances are picked up
library_manager.node.nlm.load_library(&library).await;
P2PManager::resync(
library_manager.node.nlm.clone(),
&mut stream,
peer_id,
self.metadata_manager.get().instances,
)
.await;
// TODO: Done message to frontend
self.emit_progress(pairing_id, PairingStatus::PairingComplete(library_id));
stream.flush().await.unwrap();
}
PairingResponse::Rejected => {
info!("Pairing '{pairing_id}' rejected by remote");
self.emit_progress(pairing_id, PairingStatus::PairingRejected);
}
}
});
pairing_id
}
pub async fn responder(
@ -88,7 +236,116 @@ impl PairingManager {
mut stream: impl AsyncRead + AsyncWrite + Unpin,
library_manager: &LibraryManager,
) {
todo!();
let pairing_id = self.id.fetch_add(1, Ordering::SeqCst);
self.emit_progress(pairing_id, PairingStatus::EstablishingConnection);
info!("Beginning pairing '{pairing_id}' as responder to remote peer '{peer_id}'");
let remote_instance = PairingRequest::from_stream(&mut stream).await.unwrap().0;
self.emit_progress(pairing_id, PairingStatus::PairingDecisionRequest);
self.events_tx
.send(P2PEvent::PairingRequest {
id: pairing_id,
name: remote_instance.node_name.clone(),
os: remote_instance.node_platform.clone().into(),
})
.ok();
// Prompt the user and wait
// TODO: After 1 minute remove channel from map and assume it was rejected
let (tx, rx) = oneshot::channel();
self.pairing_response
.write()
.unwrap()
.insert(pairing_id, tx);
let PairingDecision::Accept(library_id) = rx.await.unwrap() else {
info!("The user rejected pairing '{pairing_id}'!");
// self.emit_progress(pairing_id, PairingStatus::PairingRejected); // TODO: Event to remove from frontend index
stream.write_all(&PairingResponse::Rejected.to_bytes()).await.unwrap();
return;
};
info!("The user accepted pairing '{pairing_id}' for library '{library_id}'!");
let library = library_manager.get_library(&library_id).await.unwrap();
// TODO: Rollback this on pairing failure
instance::Create {
pub_id: remote_instance.id.as_bytes().to_vec(),
identity: IdentityOrRemoteIdentity::RemoteIdentity(remote_instance.identity.clone())
.to_bytes(),
node_id: remote_instance.node_id.as_bytes().to_vec(),
node_name: remote_instance.node_name,
node_platform: remote_instance.node_platform as i32,
last_seen: remote_instance.last_seen.into(),
date_created: remote_instance.date_created.into(),
// timestamp: Default::default(), // TODO: Source this properly!
_params: vec![],
}
.to_query(&library.db)
.exec()
.await
.unwrap();
stream
.write_all(
&PairingResponse::Accepted {
library_id: library.id,
library_name: library.config.name.clone().into(),
library_description: library.config.description.clone(),
instances: library
.db
.instance()
.find_many(vec![])
.exec()
.await
.unwrap()
.into_iter()
.map(|i| Instance {
id: Uuid::from_slice(&i.pub_id).unwrap(),
identity: IdentityOrRemoteIdentity::from_bytes(&i.identity)
.unwrap()
.remote_identity(),
node_id: Uuid::from_slice(&i.node_id).unwrap(),
node_name: i.node_name,
node_platform: Platform::try_from(i.node_platform as u8)
.unwrap_or(Platform::Unknown),
last_seen: i.last_seen.into(),
date_created: i.date_created.into(),
})
.collect(),
}
.to_bytes(),
)
.await
.unwrap();
// TODO: Pairing confirmation + rollback
// Called again so the new instances are picked up
library_manager.node.nlm.load_library(&library).await;
let Header::Connected(remote_identities) = Header::from_stream(&mut stream).await.unwrap() else {
todo!("unreachable; todo error handling");
};
P2PManager::resync_handler(
library_manager.node.nlm.clone(),
&mut stream,
peer_id,
self.metadata_manager.get().instances,
remote_identities,
)
.await;
self.emit_progress(pairing_id, PairingStatus::PairingComplete(library_id));
library_manager
.node
.nlm
.alert_new_ops(library_id, library.sync.clone())
.await;
stream.flush().await.unwrap();
}
}

View file

@ -10,8 +10,6 @@ use uuid::Uuid;
use crate::node::Platform;
use super::ModelData;
/// Terminology:
/// Instance - DB model which represents a single `.db` file.
/// Originator - begins the pairing process and is asking to join a library that will be selected by the responder.
@ -62,19 +60,6 @@ pub enum PairingConfirmation {
Error,
}
/// 4. Sync the data in the database with the originator.
/// Sent `Responder` -> `Originator`.
#[derive(Debug, PartialEq)]
pub enum SyncData {
Data {
/// Only included in first request and is an **estimate** of how many models will be sent.
/// It will likely be wrong so should be constrained to being used for UI purposes only.
total_models: Option<i64>,
data: ModelData,
},
Finished,
}
impl Instance {
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
@ -222,50 +207,6 @@ impl PairingConfirmation {
}
}
impl SyncData {
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
) -> Result<Self, (&'static str, decode::Error)> {
let discriminator = stream
.read_u8()
.await
.map_err(|e| ("discriminator", e.into()))?;
match discriminator {
0 => Ok(Self::Data {
total_models: match stream
.read_i64_le()
.await
.map_err(|e| ("total_models", e.into()))?
{
0 => None,
n => Some(n),
},
data: rmp_serde::from_slice(&decode::buf(stream).await.map_err(|e| ("data", e))?)
.unwrap(), // TODO: Error handling
}),
1 => Ok(Self::Finished),
_ => todo!(), // TODO: Error handling
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
let mut buf = Vec::new();
match self {
Self::Data { total_models, data } => {
buf.push(0);
buf.extend((total_models.unwrap_or(0) as i64).to_le_bytes());
encode::buf(&mut buf, &rmp_serde::to_vec_named(data)?);
}
Self::Finished => {
buf.push(1);
}
}
Ok(buf)
}
}
#[cfg(test)]
mod tests {
use sd_p2p::spacetunnel::Identity;
@ -342,24 +283,5 @@ mod tests {
let result = PairingConfirmation::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, result);
}
{
let original = SyncData::Data {
total_models: Some(123),
data: ModelData::Location(vec![]),
};
let mut cursor = std::io::Cursor::new(original.to_bytes().unwrap());
let result = SyncData::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, result);
}
{
let original = SyncData::Finished;
let mut cursor = std::io::Cursor::new(original.to_bytes().unwrap());
let result = SyncData::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, result);
}
}
}

View file

@ -1,7 +1,5 @@
use std::{collections::HashMap, sync::Arc};
use futures::future::join_all;
use sd_core_sync::{ingest, GetOpsArgs, SyncManager};
use sd_p2p::{
spacetunnel::{RemoteIdentity, Tunnel},
DiscoveredPeer, PeerId,
@ -17,6 +15,7 @@ use super::{Header, IdentityOrRemoteIdentity, P2PManager, PeerMetadata};
mod proto;
pub use proto::*;
#[derive(Debug, Clone, Copy)]
pub enum InstanceState {
Unavailable,
Discovered(PeerId),
@ -201,85 +200,93 @@ impl NetworkedLibraryManager {
}
// TODO: Error handling
pub async fn alert_new_ops(&self, library_id: Uuid, sync: &Arc<SyncManager>) {
pub async fn alert_new_ops(&self, library_id: Uuid, sync: Arc<sync::Manager>) {
debug!("NetworkedLibraryManager::alert_new_ops({library_id})");
join_all(
self.libraries
.read()
.await
.get(&library_id)
.unwrap()
.instances
.iter()
.filter_map(|(_, i)| match i {
InstanceState::Connected(peer_id) => Some(peer_id),
_ => None,
})
// TODO: Deduplicate any duplicate peer ids -> This is an edge case but still
.map(|peer_id| {
let p2p = self.p2p.clone();
async move {
debug!("Alerting peer '{peer_id:?}' of new sync events for library '{library_id:?}'");
let libraries = self.libraries.read().await;
let library = libraries.get(&library_id).unwrap();
let mut stream =
p2p.manager.stream(*peer_id).await.map_err(|_| ()).unwrap(); // TODO: handle providing incorrect peer id
// libraries only connecting one-way atm
dbg!(&library.instances);
stream
.write_all(&Header::Sync(library_id).to_bytes())
.await
.unwrap();
// TODO: Deduplicate any duplicate peer ids -> This is an edge case but still
for (_, instance) in &library.instances {
let InstanceState::Connected(peer_id) = *instance else {
continue;
};
let mut tunnel = Tunnel::initiator(stream).await.unwrap();
let sync = sync.clone();
let p2p = self.p2p.clone();
tunnel
.write_all(&SyncMessage::NewOperations.to_bytes())
.await
.unwrap();
tunnel.flush().await.unwrap();
tokio::spawn(async move {
debug!(
"Alerting peer '{peer_id:?}' of new sync events for library '{library_id:?}'"
);
let id = match SyncMessage::from_stream(&mut tunnel).await.unwrap() {
SyncMessage::OperationsRequest(resp) => resp,
_ => todo!("unreachable but proper error handling"),
};
let mut stream = p2p
.manager
.stream(peer_id.clone())
.await
.map_err(|_| ())
.unwrap(); // TODO: handle providing incorrect peer id
self.exchange_sync_ops(tunnel, peer_id, library_id, sync)
.await;
}
}),
)
.await;
stream
.write_all(&Header::Sync(library_id).to_bytes())
.await
.unwrap();
let mut tunnel = Tunnel::initiator(stream).await.unwrap();
tunnel
.write_all(&SyncMessage::NewOperations.to_bytes())
.await
.unwrap();
tunnel.flush().await.unwrap();
while let Ok(SyncMessage::OperationsRequest(args)) =
SyncMessage::from_stream(&mut tunnel).await
{
// let args = match .unwrap() {
// => resp,
// _ => todo!("unreachable but proper error handling"),
// };
let ops = sync.get_ops(args).await.unwrap();
debug!(
"Sending '{}' sync ops from peer '{peer_id:?}' for library '{library_id:?}'",
ops.len()
);
tunnel
.write_all(&SyncMessage::OperationsRequestResponse(ops).to_bytes())
.await
.unwrap();
tunnel.flush().await.unwrap();
}
});
}
}
// Ask the remote for operations and then ingest them
pub async fn request_and_ingest_ops(
pub async fn request_ops(
&self,
mut tunnel: Tunnel,
tunnel: &mut Tunnel,
args: GetOpsArgs,
sync: &SyncManager,
library_id: Uuid,
) {
) -> Vec<sd_sync::CRDTOperation> {
tunnel
.write_all(&SyncMessage::OperationsRequest(args).to_bytes())
.await
.unwrap();
tunnel.flush().await.unwrap();
let SyncMessage::OperationsRequestResponse(ops) = SyncMessage::from_stream(&mut tunnel).await.unwrap() else {
let SyncMessage::OperationsRequestResponse(ops) = SyncMessage::from_stream(tunnel).await.unwrap() else {
todo!("unreachable but proper error handling")
};
// debug!("Received sync events response w/ id '{id}' from peer '{peer_id:?}' for library '{library_id:?}'");
sync.ingest
.events
.send(ingest::Event::Messages(ingest::MessagesEvent {
instance_id: sync.instance,
messages: ops,
}))
.await
.map_err(|_| "TODO: Handle ingest channel closed, so we don't loose ops")
.unwrap();
ops
}
// TODO: Error handling
@ -288,15 +295,10 @@ impl NetworkedLibraryManager {
mut tunnel: Tunnel,
peer_id: &PeerId,
library_id: Uuid,
sync: &SyncManager,
sync: &sync::Manager,
args: GetOpsArgs,
) {
let ops = sync
.get_ops(sd_core_sync::GetOpsArgs {
clocks: vec![],
count: 100,
})
.await
.unwrap();
let ops = sync.get_ops(args).await.unwrap();
debug!(
"Sending '{}' sync ops from peer '{peer_id:?}' for library '{library_id:?}'",

View file

@ -47,7 +47,6 @@ function OriginatorDialog({
ctaLabel="Done"
// closeLabel="Cancel"
onSubmit={async () => {
alert('TODO');
// TODO: Change into the new library
}}
// onCancelled={() => acceptSpacedrop.mutate([props.dropId, null])}