Improve Sync + P2P Integration (#1265)

* Big bruh moment

* whoops

* Less stackoverflowy debug

* stuff

* Fix flawed P2P mDNS instance advertisements

* do sync when connecting with peer

* Sync after pairing

* resync_part2 all the time

* Invalidate all the things

* Invalidate whole React Query on sync event

* emit_messages_flag

* emit_messages_flag

* Backend feature flags + "emitSyncEvents" feature

* Patch `confirm` type cause Tauri cringe

* clippy

* idk but plz work

* bruh

* Fix ComLink bug

* remove log

---------

Co-authored-by: Brendan Allan <brendonovich@outlook.com>
This commit is contained in:
Oscar Beaumont 2023-08-30 01:54:58 +08:00 committed by GitHub
parent c674b7107a
commit d758977d82
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
30 changed files with 492 additions and 185 deletions

1
Cargo.lock generated
View file

@ -7009,6 +7009,7 @@ dependencies = [
"serde_json",
"slotmap",
"tokio",
"tracing 0.2.0",
"uhlc",
"uuid",
]

View file

@ -40,7 +40,6 @@
"vite": "^4.0.4",
"vite-plugin-svgr": "^2.2.1",
"vite-tsconfig-paths": "^4.0.3",
"vite-plugin-comlink": "^3.0.5",
"vite-plugin-html": "^3.2.0"
}
}

View file

@ -1,5 +1,4 @@
import { Plugin, mergeConfig } from 'vite';
import { comlink } from 'vite-plugin-comlink';
import baseConfig from '../../packages/config/vite';
const devtoolsPlugin: Plugin = {
@ -21,8 +20,5 @@ export default mergeConfig(baseConfig, {
server: {
port: 8001
},
plugins: [devtoolsPlugin, comlink()],
worker: {
plugins: [comlink()]
}
plugins: [devtoolsPlugin],
});

View file

@ -20,6 +20,10 @@ if (typeof globalThis.CustomEvent !== 'function') {
};
}
globalThis.confirm = () => {
throw new Error("TODO: Implement 'confirm' for mobile");
};
const _localStorage = new Map<string, string>();
// We patch stuff onto `globalThis` so that `@sd/client` can use it. This is super hacky but as far as I can tell, there's no better way to do this.

View file

@ -4,6 +4,6 @@
"main": "index.js",
"license": "GPL-3.0-only",
"scripts": {
"dev": "RUST_LOG=\"sd_core=info\" cargo watch -x 'run -p sd-server'"
"dev": "cargo watch -x 'run -p sd-server'"
}
}

View file

@ -5,7 +5,6 @@ edition = "2021"
[features]
default = []
emit-messages = []
[dependencies]
sd-prisma = { path = "../../../crates/prisma" }
@ -17,5 +16,6 @@ serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true }
tracing = { workspace = true }
uhlc = "0.5.2"
slotmap = "1.0.6"

View file

@ -10,11 +10,13 @@ use uuid::Uuid;
use crate::{actor::*, wait, SharedState};
#[derive(Debug)]
#[must_use]
/// Stuff that can be handled outside the actor
pub enum Request {
Messages { timestamps: Vec<(Uuid, NTP64)> },
Ingested,
FinishedIngesting,
}
/// Stuff that the actor consumes
@ -63,20 +65,18 @@ impl Actor {
State::Ingesting(wait!(self.io.event_rx, Event::Messages(event) => event))
}
State::Ingesting(event) => {
let count = event.messages.len();
dbg!(&event.messages);
for op in event.messages {
let fut = self.receive_crdt_operation(op);
fut.await;
}
println!("Ingested {count} messages!");
match event.has_more {
true => State::RetrievingMessages,
false => State::WaitingForNotification,
false => {
self.io.send(Request::FinishedIngesting).await.ok();
State::WaitingForNotification
}
}
}
};

View file

@ -8,7 +8,10 @@ mod manager;
use sd_prisma::prisma::*;
use sd_sync::*;
use std::{collections::HashMap, sync::Arc};
use std::{
collections::HashMap,
sync::{atomic::AtomicBool, Arc},
};
pub use ingest::*;
pub use manager::*;
@ -24,6 +27,7 @@ pub type Timestamps = Arc<tokio::sync::RwLock<HashMap<uuid::Uuid, NTP64>>>;
pub struct SharedState {
pub db: Arc<PrismaClient>,
pub emit_messages_flag: Arc<AtomicBool>,
pub instance: uuid::Uuid,
pub timestamps: Timestamps,
pub clock: uhlc::HLC,

View file

@ -3,7 +3,14 @@ use sd_sync::*;
use sd_utils::uuid_to_bytes;
use crate::{db_operation::*, *};
use std::{cmp::Ordering, ops::Deref, sync::Arc};
use std::{
cmp::Ordering,
ops::Deref,
sync::{
atomic::{self, AtomicBool},
Arc,
},
};
use tokio::sync::broadcast;
use uhlc::{HLCBuilder, HLC};
use uuid::Uuid;
@ -26,7 +33,11 @@ pub struct New<T> {
}
impl Manager {
pub fn new(db: &Arc<PrismaClient>, instance: Uuid) -> New<Self> {
pub fn new(
db: &Arc<PrismaClient>,
instance: Uuid,
emit_messages_flag: &Arc<AtomicBool>,
) -> New<Self> {
let (tx, rx) = broadcast::channel(64);
let timestamps: Timestamps = Default::default();
@ -37,6 +48,7 @@ impl Manager {
instance,
timestamps,
clock,
emit_messages_flag: emit_messages_flag.clone(),
});
let ingest = ingest::Actor::spawn(shared.clone());
@ -52,8 +64,9 @@ impl Manager {
tx: &PrismaClient,
(_ops, queries): (Vec<CRDTOperation>, I),
) -> prisma_client_rust::Result<<I as prisma_client_rust::BatchItemParent>::ReturnValue> {
#[cfg(feature = "emit-messages")]
let res = {
// let start = Instant::now();
let ret = if self.emit_messages_flag.load(atomic::Ordering::Relaxed) {
macro_rules! variant {
($var:ident, $variant:ident, $fn:ident) => {
let $var = _ops
@ -76,11 +89,13 @@ impl Manager {
self.tx.send(SyncMessage::Created).ok();
res
} else {
tx._batch([queries]).await?.remove(0)
};
#[cfg(not(feature = "emit-messages"))]
let res = tx._batch([queries]).await?.remove(0);
Ok(res)
// debug!("time: {}", start.elapsed().as_millis());
Ok(ret)
}
#[allow(unused_variables)]
@ -90,8 +105,7 @@ impl Manager {
op: CRDTOperation,
query: Q,
) -> prisma_client_rust::Result<<Q as prisma_client_rust::BatchItemParent>::ReturnValue> {
#[cfg(feature = "emit-messages")]
let ret = {
let ret = if self.emit_messages_flag.load(atomic::Ordering::Relaxed) {
macro_rules! exec {
($fn:ident, $inner:ident) => {
tx._batch(($fn(&op, $inner).to_query(tx), query)).await?.1
@ -106,9 +120,9 @@ impl Manager {
self.tx.send(SyncMessage::Created).ok();
ret
} else {
tx._batch(vec![query]).await?.remove(0)
};
#[cfg(not(feature = "emit-messages"))]
let ret = tx._batch(vec![query]).await?.remove(0);
Ok(ret)
}

View file

@ -47,7 +47,7 @@ impl Instance {
.await
.unwrap();
let sync = sd_core_sync::Manager::new(&db, id);
let sync = sd_core_sync::Manager::new(&db, id, &Default::default());
(
Arc::new(Self {

View file

@ -1,8 +1,9 @@
use crate::{job::JobProgressEvent, node::config::NodeConfig, Node};
use rspc::{alpha::Rspc, Config};
use crate::{invalidate_query, job::JobProgressEvent, node::config::NodeConfig, Node};
use itertools::Itertools;
use rspc::{alpha::Rspc, Config, ErrorCode};
use serde::{Deserialize, Serialize};
use specta::Type;
use std::sync::Arc;
use std::sync::{atomic::Ordering, Arc};
use uuid::Uuid;
use utils::{InvalidRequests, InvalidateOperationEvent};
@ -21,6 +22,15 @@ pub enum CoreEvent {
InvalidateOperation(InvalidateOperationEvent),
}
/// All of the feature flags provided by the core itself. The frontend has it's own set of feature flags!
///
/// If you want a variant of this to show up on the frontend it must be added to `backendFeatures` in `useFeatureFlag.tsx`
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Type)]
#[serde(rename_all = "camelCase")]
pub enum BackendFeature {
SyncEmitMessages,
}
mod backups;
mod categories;
mod files;
@ -47,6 +57,7 @@ pub struct SanitisedNodeConfig {
pub name: String,
// the port this node uses for peer to peer communication. By default a random free port will be chosen each time the application is started.
pub p2p_port: Option<u32>,
pub features: Vec<BackendFeature>,
// TODO: These will probs be replaced by your Spacedrive account in the near future.
pub p2p_email: Option<String>,
pub p2p_img_url: Option<String>,
@ -58,6 +69,7 @@ impl From<NodeConfig> for SanitisedNodeConfig {
id: value.id,
name: value.name,
p2p_port: value.p2p_port,
features: value.features,
p2p_email: value.p2p_email,
p2p_img_url: value.p2p_img_url,
}
@ -100,6 +112,40 @@ pub(crate) fn mount() -> Arc<Router> {
})
})
})
.procedure("toggleFeatureFlag", {
R.mutation(|node, feature: BackendFeature| async move {
let config = node.config.get().await;
let enabled = if config.features.iter().contains(&feature) {
node.config
.write(|mut cfg| {
cfg.features.retain(|f| *f != feature);
})
.await
.map(|_| false)
} else {
node.config
.write(|mut cfg| {
cfg.features.push(feature.clone());
})
.await
.map(|_| true)
}
.map_err(|err| rspc::Error::new(ErrorCode::InternalServerError, err.to_string()))?;
match feature {
BackendFeature::SyncEmitMessages => {
node.libraries
.emit_messages_flag
.store(enabled, Ordering::Relaxed);
}
}
invalidate_query!(node; node, "nodeState");
Ok(())
})
})
.merge("search.", search::mount())
.merge("library.", libraries::mount())
.merge("volumes.", volumes::mount())

View file

@ -25,6 +25,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
// TODO: Don't block subscription start
#[allow(clippy::unwrap_used)] // TODO: P2P isn't stable yet lol
for peer_id in node.p2p.manager.get_connected_peers().await.unwrap() {
yield P2PEvent::ConnectedPeer {
peer_id,

View file

@ -25,18 +25,31 @@ use std::sync::Mutex;
pub(crate) static INVALIDATION_REQUESTS: Mutex<InvalidRequests> =
Mutex::new(InvalidRequests::new());
// fwi: This exists to keep the enum fields private.
#[derive(Debug, Clone, Serialize, Type)]
pub struct InvalidateOperationEvent {
pub struct SingleInvalidateOperationEvent {
/// This fields are intentionally private.
key: &'static str,
arg: Value,
result: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Type)]
#[serde(tag = "type", content = "data", rename_all = "camelCase")]
pub enum InvalidateOperationEvent {
Single(SingleInvalidateOperationEvent),
// TODO: A temporary hack used with Brendan's sync system until the v2 invalidation system is implemented.
All,
}
impl InvalidateOperationEvent {
/// If you are using this function, your doing it wrong.
pub fn dangerously_create(key: &'static str, arg: Value, result: Option<Value>) -> Self {
Self { key, arg, result }
Self::Single(SingleInvalidateOperationEvent { key, arg, result })
}
pub fn all() -> Self {
Self::All
}
}
@ -287,31 +300,46 @@ pub(crate) fn mount_invalidate() -> AlphaRouter<Ctx> {
let manager_thread_active = manager_thread_active.clone();
tokio::spawn(async move {
let mut buf = HashMap::with_capacity(100);
let mut invalidate_all = false;
loop {
tokio::select! {
event = event_bus_rx.recv() => {
if let Ok(event) = event {
if let CoreEvent::InvalidateOperation(op) = event {
// Newer data replaces older data in the buffer
match to_key(&(op.key, &op.arg)) {
Ok(key) => {
buf.insert(key, op);
},
Err(err) => {
warn!("Error deriving key for invalidate operation '{:?}': {:?}", op, err);
},
if invalidate_all {
continue;
}
match &op {
InvalidateOperationEvent::Single(SingleInvalidateOperationEvent { key, arg, .. }) => {
// Newer data replaces older data in the buffer
match to_key(&(key, &arg)) {
Ok(key) => {
buf.insert(key, op);
},
Err(err) => {
warn!("Error deriving key for invalidate operation '{:?}': {:?}", op, err);
},
}
},
InvalidateOperationEvent::All => {
invalidate_all = true;
buf.clear();
}
}
}
} else {
warn!("Shutting down invalidation manager thread due to the core event bus being dropped!");
break;
}
},
// THROTTLE: Given human reaction time of ~250 milli this should be a good ballance.
_ = tokio::time::sleep(Duration::from_millis(10)) => {
let events = buf.drain().map(|(_k, v)| v).collect::<Vec<_>>();
let events = match invalidate_all {
true => vec![InvalidateOperationEvent::all()],
false => buf.drain().map(|(_k, v)| v).collect::<Vec<_>>(),
};
if !events.is_empty() {
match tx.send(events) {
Ok(_) => {},

View file

@ -1,4 +1,5 @@
use crate::{
api::{utils::InvalidateOperationEvent, CoreEvent},
invalidate_query,
location::indexer,
node::Platform,
@ -19,7 +20,7 @@ use std::{
collections::HashMap,
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
sync::{atomic::AtomicBool, Arc},
};
use chrono::Utc;
@ -56,6 +57,7 @@ pub struct Libraries {
tx: mpscrr::Sender<LibraryManagerEvent, ()>,
/// A channel for receiving events from the library manager.
pub rx: mpscrr::Receiver<LibraryManagerEvent, ()>,
pub emit_messages_flag: Arc<AtomicBool>,
}
impl Libraries {
@ -70,6 +72,7 @@ impl Libraries {
libraries: Default::default(),
tx,
rx,
emit_messages_flag: Arc::new(AtomicBool::new(false)),
}))
}
@ -379,7 +382,7 @@ impl Libraries {
// let key_manager = Arc::new(KeyManager::new(vec![]).await?);
// seed_keymanager(&db, &key_manager).await?;
let mut sync = sync::Manager::new(&db, instance_id);
let mut sync = sync::Manager::new(&db, instance_id, &self.emit_messages_flag);
let library = Library::new(
id,
@ -400,11 +403,19 @@ impl Libraries {
async move {
loop {
let Ok(SyncMessage::Created) = sync.rx.recv().await else {
let Ok(msg) = sync.rx.recv().await else {
continue;
};
p2p::sync::originator(id, &library.sync, &node.nlm, &node.p2p).await;
match msg {
// TODO: Any sync event invalidates the entire React Query cache this is a hacky workaround until the new invalidation system.
SyncMessage::Ingested => node.emit(CoreEvent::InvalidateOperation(
InvalidateOperationEvent::all(),
)),
SyncMessage::Created => {
p2p::sync::originator(id, &library.sync, &node.nlm, &node.p2p).await
}
}
}
}
});

View file

@ -9,7 +9,7 @@ use tokio::sync::{RwLock, RwLockWriteGuard};
use uuid::Uuid;
use crate::{
api::notifications::Notification,
api::{notifications::Notification, BackendFeature},
util::migrator::{Migrate, MigratorError},
};
@ -31,6 +31,9 @@ pub struct NodeConfig {
/// The p2p identity keypair for this node. This is used to identify the node on the network.
/// This keypair does effectively nothing except for provide libp2p with a stable peer_id.
pub keypair: Keypair,
/// Feature flags enabled on the node
#[serde(default)]
pub features: Vec<BackendFeature>,
// TODO: These will probs be replaced by your Spacedrive account in the near future.
pub p2p_email: Option<String>,
pub p2p_img_url: Option<String>,
@ -55,6 +58,7 @@ impl Migrate for NodeConfig {
},
p2p_port: None,
keypair: Keypair::generate(),
features: vec![],
p2p_email: None,
p2p_img_url: None,
notifications: vec![],
@ -86,10 +90,11 @@ impl Default for NodeConfig {
}
},
p2p_port: None,
features: vec![],
keypair: Keypair::generate(),
p2p_email: None,
p2p_img_url: None,
notifications: Vec::new(),
notifications: vec![],
}
}
}

View file

@ -30,7 +30,7 @@ use crate::{
};
use super::{
sync::{NetworkedLibraries, SyncMessage},
sync::{InstanceState, NetworkedLibraries, SyncMessage},
Header, PairingManager, PairingStatus, PeerMetadata,
};
@ -174,15 +174,26 @@ impl P2PManager {
node.nlm.peer_connected(event.peer_id).await;
if event.establisher {
let manager = manager.clone();
let nlm = node.nlm.clone();
let instances = metadata_manager.get().instances;
tokio::spawn(async move {
let manager = manager.clone();
let nlm = node.nlm.clone();
let instances = metadata_manager.get().instances;
let node = node.clone();
tokio::spawn(async move {
if event.establisher {
let mut stream = manager.stream(event.peer_id).await.unwrap();
Self::resync(nlm, &mut stream, event.peer_id, instances).await;
});
}
Self::resync(
nlm.clone(),
&mut stream,
event.peer_id,
instances,
)
.await;
drop(stream);
}
Self::resync_part2(nlm, node, &event.peer_id).await;
});
}
Event::PeerDisconnected(peer_id) => {
events
@ -283,11 +294,9 @@ impl P2PManager {
let library =
node.libraries.get_library(&library_id).await.unwrap();
dbg!(&msg);
match msg {
SyncMessage::NewOperations => {
super::sync::responder(tunnel, library).await;
super::sync::responder(&mut tunnel, library).await;
}
};
}
@ -299,7 +308,7 @@ impl P2PManager {
metadata_manager.get().instances,
identities,
)
.await
.await;
}
}
});
@ -382,6 +391,42 @@ impl P2PManager {
.unwrap();
}
// TODO: Using tunnel for security - Right now all sync events here are unencrypted
pub async fn resync_part2(
nlm: Arc<NetworkedLibraries>,
node: Arc<Node>,
connected_with_peer_id: &PeerId,
) {
for (library_id, data) in nlm.state().await {
let mut library = None;
for (_, data) in data.instances {
let InstanceState::Connected(instance_peer_id) = data else {
continue;
};
if instance_peer_id != *connected_with_peer_id {
continue;
};
let library = match library.clone() {
Some(library) => library,
None => match node.libraries.get_library(&library_id).await {
Some(new_library) => {
library = Some(new_library.clone());
new_library
}
None => continue,
},
};
// Remember, originator creates a new stream internally so the handler for this doesn't have to do anything.
super::sync::originator(library_id, &library.sync, &node.nlm, &node.p2p).await;
}
}
}
pub async fn accept_spacedrop(&self, id: Uuid, path: String) {
if let Some(chan) = self.spacedrop_pairing_reqs.lock().await.remove(&id) {
chan.send(Some(path)).unwrap();

View file

@ -203,7 +203,7 @@ impl PairingManager {
.unwrap();
// Called again so the new instances are picked up
node.libraries.update_instances(library);
node.libraries.update_instances(library.clone()).await;
P2PManager::resync(
node.nlm.clone(),
@ -216,6 +216,9 @@ impl PairingManager {
// TODO: Done message to frontend
self.emit_progress(pairing_id, PairingStatus::PairingComplete(library_id));
stream.flush().await.unwrap();
// Remember, originator creates a new stream internally so the handler for this doesn't have to do anything.
super::sync::originator(library_id, &library.sync, &node.nlm, &node.p2p).await;
}
PairingResponse::Rejected => {
info!("Pairing '{pairing_id}' rejected by remote");
@ -342,10 +345,10 @@ impl PairingManager {
.await;
self.emit_progress(pairing_id, PairingStatus::PairingComplete(library_id));
super::sync::originator(library_id, &library.sync, &node.nlm, &node.p2p).await;
stream.flush().await.unwrap();
// Remember, originator creates a new stream internally so the handler for this doesn't have to do anything.
super::sync::originator(library_id, &library.sync, &node.nlm, &node.p2p).await;
}
}

View file

@ -1,6 +1,6 @@
use std::{collections::HashMap, sync::Arc};
use sd_core_sync::ingest;
use itertools::{Either, Itertools};
use sd_p2p::{
proto::{decode, encode},
spacetunnel::{RemoteIdentity, Tunnel},
@ -12,10 +12,10 @@ use specta::Type;
use sync::GetOpsArgs;
use tokio::{
io::{AsyncRead, AsyncWriteExt},
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
sync::RwLock,
};
use tracing::{debug, error};
use tracing::*;
use uuid::Uuid;
use crate::{
@ -37,7 +37,7 @@ pub enum InstanceState {
#[derive(Debug, Clone, Serialize, Type)]
pub struct LibraryData {
instances: HashMap<RemoteIdentity /* Identity public key */, InstanceState>,
pub instances: HashMap<RemoteIdentity /* Identity public key */, InstanceState>,
}
type LibrariesMap = HashMap<Uuid /* Library ID */, LibraryData>;
@ -45,6 +45,8 @@ type LibrariesMap = HashMap<Uuid /* Library ID */, LibraryData>;
pub struct NetworkedLibraries {
p2p: Arc<P2PManager>,
pub(crate) libraries: RwLock<HashMap<Uuid /* Library ID */, LibraryData>>,
// A list of all instances that this node owns (has the private key for)
owned_instances: RwLock<HashMap<Uuid /* Library ID */, RemoteIdentity>>,
}
impl NetworkedLibraries {
@ -52,6 +54,7 @@ impl NetworkedLibraries {
let this = Arc::new(Self {
p2p,
libraries: Default::default(),
owned_instances: Default::default(),
});
tokio::spawn({
@ -91,44 +94,61 @@ impl NetworkedLibraries {
// TODO: Error handling
async fn load_library(self: &Arc<Self>, library: &Library) {
let instances = library
let (db_owned_instances, db_instances): (Vec<_>, Vec<_>) = library
.db
.instance()
.find_many(vec![])
.exec()
.await
.unwrap();
let metadata_instances = instances
.iter()
.map(|i| {
IdentityOrRemoteIdentity::from_bytes(&i.identity)
.unwrap()
.remote_identity()
})
.collect();
.unwrap()
.into_iter()
.partition_map(
// TODO: Error handling
|i| match IdentityOrRemoteIdentity::from_bytes(&i.identity).unwrap() {
IdentityOrRemoteIdentity::Identity(identity) => Either::Left(identity),
IdentityOrRemoteIdentity::RemoteIdentity(identity) => Either::Right(identity),
},
);
// Lock them together to ensure changes to both become visible to readers at the same time
let mut libraries = self.libraries.write().await;
let mut owned_instances = self.owned_instances.write().await;
// `self.owned_instances` exists so this call to `load_library` does override instances of other libraries.
if db_owned_instances.len() != 1 {
panic!(
"Library has '{}' owned instance! Something has gone very wrong!",
db_owned_instances.len()
);
}
owned_instances.insert(library.id, db_owned_instances[0].to_remote_identity());
let mut old_data = libraries.remove(&library.id);
libraries.insert(
library.id,
LibraryData {
instances: instances
// We register all remote instances to track connection state(`IdentityOrRemoteIdentity::RemoteIdentity`'s only).
instances: db_instances
.into_iter()
.filter_map(|i| {
// TODO: Error handling
match IdentityOrRemoteIdentity::from_bytes(&i.identity).unwrap() {
// We don't own it so don't advertise it
IdentityOrRemoteIdentity::Identity(_) => None,
IdentityOrRemoteIdentity::RemoteIdentity(identity) => {
Some((identity, InstanceState::Unavailable))
}
}
.map(|identity| {
(
identity.clone(),
match old_data
.as_mut()
.and_then(|d| d.instances.remove(&identity))
{
Some(data) => data,
None => InstanceState::Unavailable,
},
)
})
.collect(),
},
);
self.p2p.update_metadata(metadata_instances).await;
self.p2p
.update_metadata(owned_instances.values().cloned().collect::<Vec<_>>())
.await;
}
async fn edit_library(&self, _library: &Library) {
@ -138,10 +158,16 @@ impl NetworkedLibraries {
}
async fn delete_library(&self, library: &Library) {
// TODO: Do proper library delete/unpair procedure.
self.libraries.write().await.remove(&library.id);
// Lock them together to ensure changes to both become visible to readers at the same time
let mut libraries = self.libraries.write().await;
let mut owned_instances = self.owned_instances.write().await;
// TODO: Update mdns
// TODO: Do proper library delete/unpair procedure.
libraries.remove(&library.id);
owned_instances.remove(&library.id);
self.p2p
.update_metadata(owned_instances.values().cloned().collect::<Vec<_>>())
.await;
}
// TODO: Replace all these follow events with a pub/sub system????
@ -259,6 +285,7 @@ mod originator {
}
}
/// REMEMBER: This only syncs one direction!
pub async fn run(
library_id: Uuid,
sync: &Arc<sync::Manager>,
@ -268,9 +295,6 @@ mod originator {
let libraries = nlm.libraries.read().await;
let library = libraries.get(&library_id).unwrap();
// libraries only connecting one-way atm
dbg!(&library.instances);
// TODO: Deduplicate any duplicate peer ids -> This is an edge case but still
for instance in library.instances.values() {
let InstanceState::Connected(peer_id) = *instance else {
@ -300,16 +324,11 @@ mod originator {
.unwrap();
tunnel.flush().await.unwrap();
while let Ok(rx::GetOperations::Operations(args)) =
rx::GetOperations::from_stream(&mut tunnel).await
while let Ok(rx::MainRequest::GetOperations(args)) =
rx::MainRequest::from_stream(&mut tunnel).await
{
let ops = sync.get_ops(args).await.unwrap();
debug!(
"Sending '{}' sync ops from peer '{peer_id:?}' for library '{library_id:?}'",
ops.len()
);
tunnel
.write_all(&tx::Operations(ops).to_bytes())
.await
@ -332,12 +351,12 @@ mod responder {
use super::*;
#[derive(Serialize, Deserialize)]
pub enum GetOperations {
Operations(GetOpsArgs),
pub enum MainRequest {
GetOperations(GetOpsArgs),
Done,
}
impl GetOperations {
impl MainRequest {
// TODO: Per field errors for better error handling
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
@ -357,41 +376,43 @@ mod responder {
}
}
pub async fn run(mut tunnel: Tunnel, library: Arc<Library>) {
pub async fn run(stream: &mut (impl AsyncRead + AsyncWrite + Unpin), library: Arc<Library>) {
let ingest = &library.sync.ingest;
let Ok(mut rx) = ingest.req_rx.try_lock() else {
println!("Rejected sync due to libraries lock being held!");
async fn early_return(stream: &mut (impl AsyncRead + AsyncWrite + Unpin)) {
// TODO: Proper error returned to remote instead of this.
// TODO: We can't just abort the connection when the remote is expecting data.
tunnel
.write_all(&tx::GetOperations::Done.to_bytes())
stream
.write_all(&tx::MainRequest::Done.to_bytes())
.await
.unwrap();
tunnel.flush().await.unwrap();
stream.flush().await.unwrap();
}
return;
let Ok(mut rx) = ingest.req_rx.try_lock() else {
warn!("Rejected sync due to libraries lock being held!");
return early_return(stream).await;
};
ingest
.event_tx
.send(ingest::Event::Notification)
.await
.unwrap();
use sync::ingest::*;
ingest.event_tx.send(Event::Notification).await.unwrap();
while let Some(req) = rx.recv().await {
use sync::ingest::*;
const OPS_PER_REQUEST: u32 = 1000;
const OPS_PER_REQUEST: u32 = 100;
let Request::Messages { timestamps } = req else {
continue;
let timestamps = match req {
Request::FinishedIngesting => break,
Request::Messages { timestamps } => timestamps,
_ => continue,
};
tunnel
debug!("Getting ops for timestamps {timestamps:?}");
stream
.write_all(
&tx::GetOperations::Operations(sync::GetOpsArgs {
&tx::MainRequest::GetOperations(sync::GetOpsArgs {
clocks: timestamps,
count: OPS_PER_REQUEST,
})
@ -399,9 +420,9 @@ mod responder {
)
.await
.unwrap();
tunnel.flush().await.unwrap();
stream.flush().await.unwrap();
let rx::Operations(ops) = rx::Operations::from_stream(&mut tunnel).await.unwrap();
let rx::Operations(ops) = rx::Operations::from_stream(stream).await.unwrap();
ingest
.event_tx
@ -414,10 +435,12 @@ mod responder {
.expect("TODO: Handle ingest channel closed, so we don't loose ops");
}
tunnel
.write_all(&tx::GetOperations::Done.to_bytes())
debug!("Sync responder done");
stream
.write_all(&tx::MainRequest::Done.to_bytes())
.await
.unwrap();
tunnel.flush().await.unwrap();
stream.flush().await.unwrap();
}
}

View file

@ -8,6 +8,7 @@ pub enum SyncMessage {
impl SyncMessage {
// TODO: Per field errors for better error handling
// TODO: Using `decode::Error` instead of `io::Result`
pub async fn from_stream(stream: &mut (impl AsyncRead + Unpin)) -> std::io::Result<Self> {
match stream.read_u8().await? {
b'N' => Ok(Self::NewOperations),

View file

@ -1,10 +1,14 @@
use std::{net::SocketAddr, sync::Arc};
use std::{
fmt::{self, Formatter},
net::SocketAddr,
sync::Arc,
};
use crate::{Manager, ManagerStreamAction, Metadata, PeerId};
/// Represents a discovered peer.
/// This is held by [Manager] to keep track of discovered peers
#[derive(Debug, Clone)]
#[derive(Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
#[cfg_attr(feature = "specta", derive(specta::Type))]
pub struct DiscoveredPeer<TMetadata: Metadata> {
@ -18,6 +22,17 @@ pub struct DiscoveredPeer<TMetadata: Metadata> {
pub addresses: Vec<SocketAddr>,
}
// `Manager` impls `Debug` but it causes infinite loop and stack overflow, lmao.
impl<TMetadata: Metadata> fmt::Debug for DiscoveredPeer<TMetadata> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("DiscoveredPeer")
.field("peer_id", &self.peer_id)
.field("metadata", &self.metadata)
.field("addresses", &self.addresses)
.finish()
}
}
impl<TMetadata: Metadata> DiscoveredPeer<TMetadata> {
/// dial will queue an event to start a connection with the peer
pub async fn dial(self) {

View file

@ -1,5 +1,6 @@
import { SiCheckmarx } from '@icons-pack/react-simple-icons';
import {
backendFeatures,
features,
getDebugState,
isEnabled,
@ -161,24 +162,22 @@ function FeatureFlagSelector() {
return (
<DropdownMenu.Root
trigger={
<Dropdown.Button variant="gray">
<Dropdown.Button variant="gray" className="w-full">
<span className="truncate">Feature Flags</span>
</Dropdown.Button>
}
className="mt-1 shadow-none data-[side=bottom]:slide-in-from-top-2 dark:divide-menu-selected/30 dark:border-sidebar-line dark:bg-sidebar-box"
alignToTrigger
>
{features.map((feat) => (
<div key={feat} className="flex text-white">
{isEnabled(feat) && <SiCheckmarx />}
<DropdownMenu.Item
label={feat}
iconProps={{ weight: 'bold', size: 16 }}
onClick={() => toggleFeatureFlag(feat)}
className="font-medium"
/>
</div>
{[...features, ...backendFeatures].map((feat) => (
<DropdownMenu.Item
key={feat}
label={feat}
iconProps={{ weight: 'bold', size: 16 }}
onClick={() => toggleFeatureFlag(feat)}
className="font-medium text-white"
icon={isEnabled(feat) ? SiCheckmarx : undefined}
/>
))}
</DropdownMenu.Root>
);

View file

@ -8,7 +8,12 @@ import duration from 'dayjs/plugin/duration';
import relativeTime from 'dayjs/plugin/relativeTime';
import { ErrorBoundary } from 'react-error-boundary';
import { RouterProvider, RouterProviderProps } from 'react-router-dom';
import { NotificationContextProvider, P2PContextProvider, useDebugState } from '@sd/client';
import {
NotificationContextProvider,
P2PContextProvider,
useDebugState,
useLoadBackendFeatureFlags
} from '@sd/client';
import ErrorFallback from './ErrorFallback';
import { P2P } from './app/p2p';
@ -46,6 +51,8 @@ const Devtools = () => {
};
export const SpacedriveInterface = (props: { router: RouterProviderProps['router'] }) => {
useLoadBackendFeatureFlags();
return (
<ErrorBoundary FallbackComponent={ErrorFallback}>
<P2PContextProvider>

View file

@ -84,7 +84,8 @@ export type Procedures = {
{ key: "tags.assign", input: LibraryArgs<TagAssignArgs>, result: null } |
{ key: "tags.create", input: LibraryArgs<TagCreateArgs>, result: Tag } |
{ key: "tags.delete", input: LibraryArgs<number>, result: null } |
{ key: "tags.update", input: LibraryArgs<TagUpdateArgs>, result: null },
{ key: "tags.update", input: LibraryArgs<TagUpdateArgs>, result: null } |
{ key: "toggleFeatureFlag", input: BackendFeature, result: null },
subscriptions:
{ key: "invalidation.listen", input: never, result: InvalidateOperationEvent[] } |
{ key: "jobs.newThumbnail", input: LibraryArgs<null>, result: string[] } |
@ -99,6 +100,13 @@ export type Procedures = {
export type AudioMetadata = { duration: number | null; audio_codec: string | null }
/**
* All of the feature flags provided by the core itself. The frontend has it's own set of feature flags!
*
* If you want a variant of this to show up on the frontend it must be added to `backendFeatures` in `useFeatureFlag.tsx`
*/
export type BackendFeature = "syncEmitMessages"
export type Backup = ({ id: string; timestamp: string; library_id: string; library_name: string }) & { path: string }
export type BuildInfo = { version: string; commit: string }
@ -199,7 +207,7 @@ export type IndexerRuleCreateArgs = { name: string; dry_run: boolean; rules: ([R
export type InstanceState = "Unavailable" | { Discovered: PeerId } | { Connected: PeerId }
export type InvalidateOperationEvent = { key: string; arg: any; result: any | null }
export type InvalidateOperationEvent = { type: "single"; data: SingleInvalidateOperationEvent } | { type: "all" }
export type JobGroup = { id: string; action: string | null; status: JobStatus; created_at: string; jobs: JobReport[] }
@ -267,7 +275,7 @@ export type MediaMetadata = ({ type: "Image" } & ImageMetadata) | ({ type: "Vide
*/
export type MediaTime = { Naive: string } | { Utc: string } | "Undefined"
export type NodeState = ({ id: string; name: string; p2p_port: number | null; p2p_email: string | null; p2p_img_url: string | null }) & { data_path: string }
export type NodeState = ({ id: string; name: string; p2p_port: number | null; features: BackendFeature[]; p2p_email: string | null; p2p_img_url: string | null }) & { data_path: string }
export type NonIndexedFileSystemEntries = { entries: ExplorerItem[]; errors: Error[] }
@ -345,7 +353,7 @@ export type RescanArgs = { location_id: number; sub_path: string }
export type RuleKind = "AcceptFilesByGlob" | "RejectFilesByGlob" | "AcceptIfChildrenDirectoriesArePresent" | "RejectIfChildrenDirectoriesArePresent"
export type SanitisedNodeConfig = { id: string; name: string; p2p_port: number | null; p2p_email: string | null; p2p_img_url: string | null }
export type SanitisedNodeConfig = { id: string; name: string; p2p_port: number | null; features: BackendFeature[]; p2p_email: string | null; p2p_img_url: string | null }
export type SearchData<T> = { cursor: number[] | null; items: T[] }
@ -357,6 +365,8 @@ export type SharedOperation = { record_id: any; model: string; data: SharedOpera
export type SharedOperationData = "c" | { u: { field: string; value: any } } | "d"
export type SingleInvalidateOperationEvent = { key: string; arg: any; result: any | null }
export type SortOrder = "Asc" | "Desc"
export type SpacedropArgs = { peer_id: PeerId; file_path: string[] }

View file

@ -1,14 +1,46 @@
import { useEffect } from 'react';
import { subscribe, useSnapshot } from 'valtio';
import type { BackendFeature } from '../core';
import { valtioPersist } from '../lib/valito';
import { nonLibraryClient, useBridgeQuery } from '../rspc';
export const features = ['spacedrop', 'p2pPairing', 'syncRoute', 'backups'] as const;
export type FeatureFlag = (typeof features)[number];
// This defines which backend feature flags show up in the UI.
// This is kinda a hack to not having the runtime array of possible features as Specta only exports the types.
export const backendFeatures: BackendFeature[] = ['syncEmitMessages'];
const featureFlagState = valtioPersist('sd-featureFlags', {
enabled: [] as FeatureFlag[]
});
export type FeatureFlag = (typeof features)[number] | BackendFeature;
const featureFlagState = valtioPersist(
'sd-featureFlags',
{
enabled: [] as FeatureFlag[]
},
{
saveFn(data) {
// Clone so we don't mess with the original data
const data2: typeof data = JSON.parse(JSON.stringify(data));
// Only save frontend flags (backend flags are saved in the backend)
data2.enabled = data2.enabled.filter((f) => features.includes(f as any));
return data2;
}
}
);
export function useLoadBackendFeatureFlags() {
const nodeConfig = useBridgeQuery(['nodeState']);
useEffect(() => {
featureFlagState.enabled = [
// Remove all backend features.
...featureFlagState.enabled.filter((f) => features.includes(f as any)),
// Add back in current state of backend features
...(nodeConfig.data?.features ?? [])
];
}, [nodeConfig.data?.features]);
}
export function useFeatureFlags() {
return useSnapshot(featureFlagState);
@ -31,18 +63,47 @@ export function toggleFeatureFlag(flags: FeatureFlag | FeatureFlag[]) {
flags = [flags];
}
flags.forEach((f) => {
// If not in `features` it must be a backend feature
if (!features.includes(f as any)) {
void (async () => {
// Tauri's `confirm` returns a Promise
// Only prompt when enabling the feature
const result = featureFlagState.enabled.find((ff) => f === ff)
? true
: await confirm(
'This feature will render your database broken and it WILL need to be reset! Use at your own risk!'
);
if (result) {
nonLibraryClient.mutation(['toggleFeatureFlag', f as any]);
}
})();
return;
}
if (!featureFlagState.enabled.find((ff) => f === ff)) {
let message: string | undefined;
if (f === 'p2pPairing') {
alert(
'Pairing will render your database broken and it WILL need to be reset! Use at your own risk!'
);
message =
'This feature will render your database broken and it WILL need to be reset! Use at your own risk!';
} else if (f === 'backups') {
alert(
'Backups are done on your live DB without proper Sqlite snapshotting. This will work but it could result in unintended side effects on the backup!'
);
message =
'Backups are done on your live DB without proper Sqlite snapshotting. This will work but it could result in unintended side so be careful!';
}
featureFlagState.enabled.push(f);
if (message) {
void (async () => {
// Tauri's `confirm` returns a promise but it's not typesafe
const result = await confirm(message);
if (result) {
featureFlagState.enabled.push(f);
}
})();
} else {
featureFlagState.enabled.push(f);
}
} else {
featureFlagState.enabled = featureFlagState.enabled.filter((ff) => f !== ff);
}

View file

@ -14,6 +14,12 @@ if (
)
throw new Error('Please ensure you have patched `globalThis` before importing `@sd/client`!');
declare global {
// Tauri is cringe and returns a Promise breaking compatibility with the browser API
// export function confirm(): never; // boolean | Promise<boolean>;
export function confirm(): boolean | Promise<boolean>;
}
export * from './hooks';
export * from './rspc';
export * from './core';

View file

@ -11,9 +11,24 @@ export function resetStore<T extends Record<string, any>, E extends Record<strin
}
// The `valtio-persist` library is not working so this is a small alternative for us to use.
export function valtioPersist<T extends object>(localStorageKey: string, initialObject?: T): T {
export function valtioPersist<T extends object>(
localStorageKey: string,
initialObject?: T,
opts?: {
saveFn?: (data: T) => any;
restoreFn?: (data: any) => T;
}
): T {
const d = localStorage.getItem(localStorageKey);
const p = proxy(d !== null ? JSON.parse(d) : initialObject);
subscribe(p, () => localStorage.setItem(localStorageKey, JSON.stringify(p)));
const p = proxy(
d !== null
? opts?.restoreFn
? opts.restoreFn(JSON.parse(d))
: JSON.parse(d)
: initialObject
);
subscribe(p, () =>
localStorage.setItem(localStorageKey, JSON.stringify(opts?.saveFn ? opts.saveFn(p) : p))
);
return p;
}

View file

@ -3,6 +3,7 @@ import { AlphaRSPCError, initRspc } from '@rspc/client/v2';
import { Context, createReactQueryHooks } from '@rspc/react/v2';
import { QueryClient } from '@tanstack/react-query';
import { PropsWithChildren, createContext, useContext } from 'react';
import { P, match } from 'ts-pattern';
import { LibraryArgs, Procedures } from './core';
import { currentLibraryCache } from './hooks';
@ -96,16 +97,23 @@ export function useInvalidateQuery() {
useBridgeSubscription(['invalidation.listen'], {
onData: (ops) => {
for (const op of ops) {
let key = [op.key];
if (op.arg !== null) {
key = key.concat(op.arg);
}
match(op)
.with({ type: 'single', data: P.select() }, (op) => {
let key = [op.key];
if (op.arg !== null) {
key = key.concat(op.arg);
}
if (op.result !== null) {
context.queryClient.setQueryData(key, op.result);
} else {
context.queryClient.invalidateQueries(key);
}
if (op.result !== null) {
context.queryClient.setQueryData(key, op.result);
} else {
context.queryClient.invalidateQueries(key);
}
})
.with({ type: 'all' }, (op) => {
context.queryClient.invalidateQueries();
})
.exhaustive();
}
}
});

View file

@ -22,6 +22,7 @@
"eslint-plugin-tailwindcss": "^3.12.0",
"eslint-utils": "^3.0.0",
"regexpp": "^3.2.0",
"vite-plugin-comlink": "^3.0.5",
"vite-plugin-html": "^3.2.0",
"vite-plugin-svgr": "^2.2.1"
}

View file

@ -3,6 +3,7 @@ import { defineConfig } from 'vite';
import { createHtmlPlugin } from 'vite-plugin-html';
import svg from 'vite-plugin-svgr';
import tsconfigPaths from 'vite-tsconfig-paths';
import { comlink } from 'vite-plugin-comlink';
import relativeAliasResolver from './relativeAliasResolver';
export default defineConfig({
@ -12,7 +13,8 @@ export default defineConfig({
svg({ svgrOptions: { icon: true } }),
createHtmlPlugin({
minify: true
})
}),
comlink()
],
css: {
modules: {
@ -26,5 +28,8 @@ export default defineConfig({
build: {
outDir: '../dist',
assetsDir: '.'
},
worker: {
plugins: [comlink()]
}
});

View file

@ -120,9 +120,6 @@ importers:
vite:
specifier: ^4.0.4
version: 4.3.9(sass@1.55.0)
vite-plugin-comlink:
specifier: ^3.0.5
version: 3.0.5(comlink@4.4.1)(vite@4.3.9)
vite-plugin-html:
specifier: ^3.2.0
version: 3.2.0(vite@4.3.9)
@ -908,6 +905,9 @@ importers:
regexpp:
specifier: ^3.2.0
version: 3.2.0
vite-plugin-comlink:
specifier: ^3.0.5
version: 3.0.5(comlink@4.4.1)(vite@3.2.7)
vite-plugin-html:
specifier: ^3.2.0
version: 3.2.0(vite@3.2.7)
@ -6909,7 +6909,7 @@ packages:
magic-string: 0.27.0
react-docgen-typescript: 2.2.2(typescript@5.0.4)
typescript: 5.0.4
vite: 4.3.9(less@4.2.0)
vite: 4.3.9(@types/node@18.15.1)
/@jridgewell/gen-mapping@0.3.3:
resolution: {integrity: sha512-HLhSWOLRi875zjjMG/r+Nv0oCW8umGb0BgEhyX3dDX3egwZtB8PqLnjz3yedt8R5StBrzcg4aBpnh8UA9D1BoQ==}
@ -9935,7 +9935,7 @@ packages:
remark-slug: 6.1.0
rollup: 3.28.1
typescript: 5.0.4
vite: 4.3.9(less@4.2.0)
vite: 4.3.9(@types/node@18.15.1)
transitivePeerDependencies:
- supports-color
@ -10535,7 +10535,7 @@ packages:
react: 18.2.0
react-docgen: 6.0.0-alpha.3
react-dom: 18.2.0(react@18.2.0)
vite: 4.3.9(less@4.2.0)
vite: 4.3.9(@types/node@18.15.1)
transitivePeerDependencies:
- '@preact/preset-vite'
- supports-color
@ -24603,7 +24603,7 @@ packages:
vfile-message: 3.1.4
dev: false
/vite-plugin-comlink@3.0.5(comlink@4.4.1)(vite@4.3.9):
/vite-plugin-comlink@3.0.5(comlink@4.4.1)(vite@3.2.7):
resolution: {integrity: sha512-my8BE9GFJEaLc7l3e2SfRUL8JJsN9On8PiW7q4Eyq3g6DHUsNqo5WlS7Butuzc8ngrs24Tf1RC8Xfdda+E5T9w==}
peerDependencies:
comlink: ^4.3.1
@ -24612,7 +24612,7 @@ packages:
comlink: 4.4.1
json5: 2.2.1
magic-string: 0.26.7
vite: 4.3.9(sass@1.55.0)
vite: 3.2.7
dev: true
/vite-plugin-html@3.2.0(vite@3.2.7):
@ -24771,7 +24771,6 @@ packages:
rollup: 3.28.1
optionalDependencies:
fsevents: 2.3.3
dev: true
/vite@4.3.9(less@4.2.0):
resolution: {integrity: sha512-qsTNZjO9NoJNW7KnOrgYwczm0WctJ8m/yqYAMAK9Lxt4SoySUfS5S8ia9K7JHpa3KEeMfyF8LoJ3c5NeBJy6pg==}