Networked sync (#1133)

* temp remove discriminator from tunnel

* i'm lost :(

* Working `Tunnel` + move sync stuff back into p2p

* sync module

* `NetworkedSyncManager`

* Move sync stuff off `P2PManager`

* Test `SyncMessage`

* Library edit + delete hooks

* stable `identity` column

* fix

* plz work

* p2p is hard

* sync identities post pairing

* Remove `p384`

* a couple improvements

* comment out ReqRes

---------

Co-authored-by: Brendan Allan <brendonovich@outlook.com>
This commit is contained in:
Oscar Beaumont 2023-08-02 19:49:50 +08:00 committed by GitHub
parent 13a3fbe486
commit 6c9e08540f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
39 changed files with 1588 additions and 594 deletions

220
Cargo.lock generated
View file

@ -610,12 +610,6 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce"
[[package]]
name = "base16ct"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf"
[[package]]
name = "base36"
version = "0.0.1"
@ -1508,9 +1502,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf4c2f4e1afd912bc40bfd6fed5d9dc1f288e0ba01bfcc835cc5bc3eb13efe15"
dependencies = [
"generic-array",
"rand_core 0.6.4",
"subtle",
"zeroize",
]
[[package]]
@ -1810,18 +1802,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1a467a65c5e759bce6e65eaf91cc29f466cdc57cb65777bd646872a8a1fd4de"
dependencies = [
"const-oid",
"pem-rfc7468 0.6.0",
"zeroize",
]
[[package]]
name = "der"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56acb310e15652100da43d130af8d97b509e95af61aab1c5a7939ef24337ee17"
dependencies = [
"const-oid",
"pem-rfc7468 0.7.0",
"pem-rfc7468",
"zeroize",
]
@ -1934,7 +1915,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer 0.10.4",
"const-oid",
"crypto-common",
"subtle",
]
@ -2059,24 +2039,10 @@ version = "0.14.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c"
dependencies = [
"der 0.6.1",
"elliptic-curve 0.12.3",
"rfc6979 0.3.1",
"signature 1.6.4",
]
[[package]]
name = "ecdsa"
version = "0.16.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0997c976637b606099b9985693efa3581e84e41f5c11ba5255f88711058ad428"
dependencies = [
"der 0.7.6",
"digest 0.10.7",
"elliptic-curve 0.13.5",
"rfc6979 0.4.0",
"signature 2.1.0",
"spki 0.7.2",
"der",
"elliptic-curve",
"rfc6979",
"signature",
]
[[package]]
@ -2085,7 +2051,7 @@ version = "1.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91cff35c70bba8a626e3185d8cd48cc11b5437e1a5bcd15b9b5fa3c64b6dfee7"
dependencies = [
"signature 1.6.4",
"signature",
]
[[package]]
@ -2114,39 +2080,18 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7bb888ab5300a19b8e5bceef25ac745ad065f3c9f7efc6de1b91958110891d3"
dependencies = [
"base16ct 0.1.1",
"base16ct",
"crypto-bigint 0.4.9",
"der 0.6.1",
"der",
"digest 0.10.7",
"ff 0.12.1",
"ff",
"generic-array",
"group 0.12.1",
"group",
"hkdf 0.12.3",
"pem-rfc7468 0.6.0",
"pkcs8 0.9.0",
"pem-rfc7468",
"pkcs8",
"rand_core 0.6.4",
"sec1 0.3.0",
"subtle",
"zeroize",
]
[[package]]
name = "elliptic-curve"
version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "968405c8fdc9b3bf4df0a6638858cc0b52462836ab6b1c87377785dd09cf1c0b"
dependencies = [
"base16ct 0.2.0",
"crypto-bigint 0.5.2",
"digest 0.10.7",
"ff 0.13.0",
"generic-array",
"group 0.13.0",
"hkdf 0.12.3",
"pem-rfc7468 0.7.0",
"pkcs8 0.10.2",
"rand_core 0.6.4",
"sec1 0.7.2",
"sec1",
"subtle",
"zeroize",
]
@ -2364,16 +2309,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "ff"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ded41244b729663b1e574f1b4fb731469f69f79c17667b5d776b16cda0479449"
dependencies = [
"rand_core 0.6.4",
"subtle",
]
[[package]]
name = "ffmpeg-sys-next"
version = "6.0.1"
@ -2772,7 +2707,6 @@ checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
dependencies = [
"typenum",
"version_check",
"zeroize",
]
[[package]]
@ -2957,18 +2891,7 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7"
dependencies = [
"ff 0.12.1",
"rand_core 0.6.4",
"subtle",
]
[[package]]
name = "group"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0f9ef7462f7c099f518d754361858f86d8a07af53ba9af0fe635bbccb151a63"
dependencies = [
"ff 0.13.0",
"ff",
"rand_core 0.6.4",
"subtle",
]
@ -5334,8 +5257,8 @@ version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594"
dependencies = [
"ecdsa 0.14.8",
"elliptic-curve 0.12.3",
"ecdsa",
"elliptic-curve",
"sha2 0.10.6",
]
@ -5345,20 +5268,8 @@ version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc8c5bf642dde52bb9e87c0ecd8ca5a76faac2eeed98dedb7c717997e1080aa"
dependencies = [
"ecdsa 0.14.8",
"elliptic-curve 0.12.3",
"sha2 0.10.6",
]
[[package]]
name = "p384"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70786f51bcc69f6a4c0360e063a4cac5419ef7c5cd5b3c99ad70f3be5ba79209"
dependencies = [
"ecdsa 0.16.7",
"elliptic-curve 0.13.5",
"primeorder",
"ecdsa",
"elliptic-curve",
"sha2 0.10.6",
]
@ -5510,15 +5421,6 @@ dependencies = [
"base64ct",
]
[[package]]
name = "pem-rfc7468"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412"
dependencies = [
"base64ct",
]
[[package]]
name = "percent-encoding"
version = "2.2.0"
@ -5715,18 +5617,8 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba"
dependencies = [
"der 0.6.1",
"spki 0.6.0",
]
[[package]]
name = "pkcs8"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7"
dependencies = [
"der 0.7.6",
"spki 0.7.2",
"der",
"spki",
]
[[package]]
@ -5842,15 +5734,6 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c"
[[package]]
name = "primeorder"
version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c2fcef82c0ec6eefcc179b978446c399b3cdf73c392c35604e399eee6df1ee3"
dependencies = [
"elliptic-curve 0.13.5",
]
[[package]]
name = "prisma-cli"
version = "0.1.0"
@ -6688,16 +6571,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "rfc6979"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8dd2a808d456c4a54e300a23e9f5a67e122c3024119acbfd73e3bf664491cb2"
dependencies = [
"hmac 0.12.1",
"subtle",
]
[[package]]
name = "rfd"
version = "0.10.0"
@ -7138,6 +7011,7 @@ name = "sd-core-sync"
version = "0.1.0"
dependencies = [
"prisma-client-rust",
"sd-p2p",
"sd-prisma",
"sd-sync",
"sd-utils",
@ -7285,11 +7159,11 @@ dependencies = [
"arc-swap",
"ed25519-dalek",
"flume",
"hex",
"if-watch",
"libp2p",
"libp2p-quic",
"mdns-sd",
"p384 0.13.0",
"rand_core 0.5.1",
"rmp-serde",
"serde",
@ -7364,24 +7238,10 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928"
dependencies = [
"base16ct 0.1.1",
"der 0.6.1",
"base16ct",
"der",
"generic-array",
"pkcs8 0.9.0",
"subtle",
"zeroize",
]
[[package]]
name = "sec1"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0aec48e813d6b90b15f0b8948af3c63483992dee44c03e9930b3eebdabe046e"
dependencies = [
"base16ct 0.2.0",
"der 0.7.6",
"generic-array",
"pkcs8 0.10.2",
"pkcs8",
"subtle",
"zeroize",
]
@ -7747,16 +7607,6 @@ dependencies = [
"rand_core 0.6.4",
]
[[package]]
name = "signature"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e1788eed21689f9cf370582dfc467ef36ed9c707f073528ddafa8d83e3b8500"
dependencies = [
"digest 0.10.7",
"rand_core 0.6.4",
]
[[package]]
name = "simd-adler32"
version = "0.3.5"
@ -7946,17 +7796,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67cf02bbac7a337dc36e4f5a693db6c21e7863f45070f7064577eb4367a3212b"
dependencies = [
"base64ct",
"der 0.6.1",
]
[[package]]
name = "spki"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d1e996ef02c474957d681f1b05213dfb0abab947b446a62d37770b23500184a"
dependencies = [
"base64ct",
"der 0.7.6",
"der",
]
[[package]]
@ -9735,23 +9575,23 @@ dependencies = [
"ccm",
"curve25519-dalek 3.2.0",
"der-parser 8.2.0",
"elliptic-curve 0.12.3",
"elliptic-curve",
"hkdf 0.12.3",
"hmac 0.12.1",
"log",
"oid-registry 0.6.1",
"p256",
"p384 0.11.2",
"p384",
"rand 0.8.5",
"rand_core 0.6.4",
"rcgen 0.9.3",
"ring",
"rustls 0.19.1",
"sec1 0.3.0",
"sec1",
"serde",
"sha1",
"sha2 0.10.6",
"signature 1.6.4",
"signature",
"subtle",
"thiserror",
"tokio",

View file

@ -5,13 +5,14 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = []
default = ["emit-messages"]
emit-messages = []
[dependencies]
sd-prisma = { path = "../../../crates/prisma" }
sd-sync = { path = "../../../crates/sync" }
sd-utils = { path = "../../../crates/utils" }
sd-p2p = { path = "../../../crates/p2p" }
prisma-client-rust = { workspace = true }
serde = { workspace = true }

View file

@ -0,0 +1,169 @@
use sd_p2p::{spacetunnel::Tunnel, PeerId};
use sd_sync::CRDTOperation;
use tokio::sync::mpsc;
use uhlc::NTP64;
use uuid::Uuid;
use crate::Timestamps;
pub struct Actor {
pub events: mpsc::Sender<Event>,
}
#[must_use]
pub enum Request {
Messages {
tunnel: Tunnel,
timestamps: Vec<(Uuid, NTP64)>,
},
Ingest(Vec<CRDTOperation>),
}
#[derive(Debug)]
pub enum Event {
Notification(NotificationEvent),
Messages(MessagesEvent),
}
#[derive(Debug)]
pub struct MessagesEvent {
pub instance_id: Uuid,
pub messages: Vec<CRDTOperation>,
}
#[derive(Debug)]
pub struct NotificationEvent {
pub tunnel: Tunnel,
}
#[derive(Debug)]
pub enum State {
WaitingForNotification,
ExecutingMessagesRequest,
Ingesting,
}
#[macro_export]
macro_rules! wait {
($rx:ident, $pattern:pat $(=> $expr:expr)?) => {
loop {
match $rx.recv().await {
Some($pattern) => break $($expr)?,
_ => continue
}
}
};
}
impl Actor {
pub fn spawn(timestamps: Timestamps) -> (Self, mpsc::Receiver<Request>) {
let (req_tx, req_rx) = mpsc::channel(4);
let (events_tx, mut events_rx) = mpsc::channel(4);
tokio::spawn(async move {
let mut state = State::WaitingForNotification;
loop {
dbg!(&state);
state = match state {
State::WaitingForNotification => {
let notification = wait!(events_rx, Event::Notification(n) => n);
// req_tx.send(Request::Messages(tunnel, peer_id, 69));
// let notification = wait!(
// events_rx,
// Incoming::Notification(notification) => notification
// );
req_tx
.send(Request::Messages {
tunnel: notification.tunnel,
timestamps: timestamps
.read()
.await
.iter()
.map(|(&k, &v)| (k, v))
.collect(),
})
.await
.ok();
State::ExecutingMessagesRequest
}
State::ExecutingMessagesRequest => {
let event = wait!(events_rx, Event::Messages(event) => event);
req_tx
.send(Request::Ingest(event.messages.clone()))
.await
.ok();
dbg!(&event.messages);
State::Ingesting
}
State::Ingesting => {
println!("Ingested!");
State::WaitingForNotification
}
};
}
});
(Self { events: events_tx }, req_rx)
}
pub async fn notify(&self, tunnel: Tunnel, _peer_id: PeerId) {
self.events
.send(Event::Notification(NotificationEvent { tunnel }))
.await
.ok();
}
}
// #[must_use]
// pub struct ReqRes<TReq, TResp> {
// request: TReq,
// response_sender: oneshot::Sender<TResp>,
// }
// impl<TReq, TResp> ReqRes<TReq, TResp> {
// pub async fn send<TContainer>(
// request: TReq,
// container_fn: impl Fn(Self) -> TContainer,
// sender: &mpsc::Sender<TContainer>,
// ) -> TResp {
// let (tx, rx) = oneshot::channel();
// let payload = container_fn(Self {
// request,
// response_sender: tx,
// });
// sender.send(payload).await.ok();
// rx.await.unwrap()
// }
// #[must_use]
// pub fn split(self) -> (TReq, impl FnOnce(TResp)) {
// (self.request, |response| {
// self.response_sender.send(response).ok();
// })
// }
// pub async fn map<
// TFn: FnOnce(TReq) -> TFut,
// TFut: Future<Output = Result<TResp, TErr>>,
// TErr,
// >(
// self,
// func: TFn,
// ) -> Result<(), TErr> {
// self.response_sender.send(func(self.request).await?).ok();
// Ok(())
// }
// }

View file

@ -1,5 +1,7 @@
#![allow(clippy::unwrap_used, clippy::panic)] // TODO: Brendan remove this once you've got error handling here
pub mod ingest;
use sd_prisma::{prisma::*, prisma_sync::ModelSyncData};
use sd_sync::*;
use sd_utils::uuid_to_bytes;
@ -10,40 +12,60 @@ use std::{
};
use serde_json::to_vec;
use tokio::sync::broadcast::{self, Receiver, Sender};
use uhlc::{HLCBuilder, Timestamp, HLC, NTP64};
use tokio::sync::{
broadcast::{self},
mpsc, RwLock,
};
use uhlc::{HLCBuilder, Timestamp, HLC};
use uuid::Uuid;
pub use sd_prisma::prisma_sync;
pub use uhlc::NTP64;
#[derive(Clone)]
pub enum SyncMessage {
Ingested(CRDTOperation),
Created(CRDTOperation),
Ingested,
Created,
}
pub type Timestamps = Arc<RwLock<HashMap<Uuid, NTP64>>>;
pub struct SyncManager {
db: Arc<PrismaClient>,
instance: Uuid,
_clocks: HashMap<Uuid, NTP64>,
pub instance: Uuid,
// TODO: Remove `Mutex` and store this on `ingest` actor
timestamps: Timestamps,
clock: HLC,
pub tx: Sender<SyncMessage>,
pub tx: broadcast::Sender<SyncMessage>,
pub ingest: ingest::Actor,
}
pub struct SyncManagerNew {
pub manager: SyncManager,
pub rx: broadcast::Receiver<SyncMessage>,
pub ingest_rx: mpsc::Receiver<ingest::Request>,
}
impl SyncManager {
pub fn new(db: &Arc<PrismaClient>, instance: Uuid) -> (Self, Receiver<SyncMessage>) {
pub fn new(db: &Arc<PrismaClient>, instance: Uuid) -> SyncManagerNew {
let (tx, rx) = broadcast::channel(64);
(
Self {
let timestamps: Timestamps = Default::default();
let (ingest, ingest_rx) = ingest::Actor::spawn(timestamps.clone());
SyncManagerNew {
manager: Self {
db: db.clone(),
instance,
clock: HLCBuilder::new().with_id(instance.into()).build(),
_clocks: Default::default(),
timestamps,
tx,
ingest,
},
rx,
)
ingest_rx,
}
}
pub async fn write_ops<'item, I: prisma_client_rust::BatchItem<'item>>(
@ -72,9 +94,7 @@ impl SyncManager {
let (res, _) = tx._batch((queries, (shared, relation))).await?;
for op in _ops {
self.tx.send(SyncMessage::Created(op)).ok();
}
self.tx.send(SyncMessage::Created).ok();
res
};
@ -104,7 +124,7 @@ impl SyncManager {
CRDTOperationType::Relation(inner) => exec!(relation_op_db, inner),
};
self.tx.send(SyncMessage::Created(op)).ok();
self.tx.send(SyncMessage::Created).ok();
ret
};
@ -114,7 +134,10 @@ impl SyncManager {
Ok(ret)
}
pub async fn get_ops(&self) -> prisma_client_rust::Result<Vec<CRDTOperation>> {
pub async fn get_ops(
&self,
args: GetOpsArgs,
) -> prisma_client_rust::Result<Vec<CRDTOperation>> {
let Self { db, .. } = self;
shared_operation::include!(shared_include {
@ -175,13 +198,33 @@ impl SyncManager {
}
}
macro_rules! db_args {
($op:ident) => {
vec![prisma_client_rust::operator::or(
args.clocks
.iter()
.map(|(instance_id, timestamp)| {
prisma_client_rust::and![
$op::instance::is(vec![instance::pub_id::equals(uuid_to_bytes(
*instance_id
))]),
$op::timestamp::gte(timestamp.as_u64() as i64)
]
})
.collect(),
)]
};
}
let (shared, relation) = db
._batch((
db.shared_operation()
.find_many(vec![])
.find_many(db_args!(shared_operation))
.take(args.count as i64)
.include(shared_include::include()),
db.relation_operation()
.find_many(vec![])
.find_many(db_args!(relation_operation))
.take(args.count as i64)
.include(relation_include::include()),
))
.await?;
@ -203,8 +246,9 @@ impl SyncManager {
Ok(ops
.into_values()
.map(DbOperation::into_operation)
.rev()
.take(args.count as usize)
.map(DbOperation::into_operation)
.collect())
}
@ -229,7 +273,7 @@ impl SyncManager {
}
}
self.tx.send(SyncMessage::Ingested(op.clone())).ok();
self.tx.send(SyncMessage::Ingested).ok();
Ok(())
}
@ -281,15 +325,13 @@ impl SyncManager {
.unwrap_or_default()
}
pub async fn receive_crdt_operation(&mut self, op: CRDTOperation) {
pub async fn receive_crdt_operation(&self, op: CRDTOperation) {
self.clock
.update_with_timestamp(&Timestamp::new(op.timestamp, op.instance.into()))
.ok();
let timestamp = self
._clocks
.entry(op.instance)
.or_insert_with(|| op.timestamp);
let mut clocks = self.timestamps.write().await;
let timestamp = clocks.entry(op.instance).or_insert_with(|| op.timestamp);
if *timestamp < op.timestamp {
*timestamp = op.timestamp;
@ -314,6 +356,16 @@ impl SyncManager {
.await
.ok();
}
pub async fn register_instance(&self, instance_id: Uuid) {
self.timestamps.write().await.insert(instance_id, NTP64(0));
}
}
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
pub struct GetOpsArgs {
pub clocks: Vec<(Uuid, NTP64)>,
pub count: u32,
}
fn shared_op_db(op: &CRDTOperation, shared_op: &SharedOperation) -> shared_operation::Create {

View file

@ -0,0 +1,221 @@
use sd_core_sync::*;
use sd_prisma::{prisma, prisma_sync};
use sd_sync::*;
use sd_utils::uuid_to_bytes;
use prisma_client_rust::chrono::Utc;
use serde_json::json;
use std::{sync::Arc, time::Duration};
use tokio::sync::{broadcast, mpsc};
use uuid::Uuid;
fn db_path(id: Uuid) -> String {
format!("./tests/test-{id}.db")
}
#[derive(Clone)]
struct Instance {
id: Uuid,
_peer_id: sd_p2p::PeerId,
db: Arc<prisma::PrismaClient>,
sync: Arc<SyncManager>,
}
impl Instance {
async fn new(
id: Uuid,
) -> (
Arc<Self>,
broadcast::Receiver<SyncMessage>,
mpsc::Receiver<ingest::Request>,
) {
let db = Arc::new(
prisma::PrismaClient::_builder()
.with_url(format!("file:{}", db_path(id)))
.build()
.await
.unwrap(),
);
db._db_push().await.unwrap();
db.instance()
.create(
uuid_to_bytes(id),
vec![],
vec![],
format!("Instace {id}"),
0,
Utc::now().into(),
Utc::now().into(),
vec![],
)
.exec()
.await
.unwrap();
let sync = sd_core_sync::SyncManager::new(&db, id);
(
Arc::new(Self {
id,
db,
_peer_id: sd_p2p::PeerId::random(),
sync: Arc::new(sync.manager),
}),
sync.rx,
sync.ingest_rx,
)
}
async fn teardown(&self) {
tokio::fs::remove_file(db_path(self.id)).await.unwrap();
}
async fn pair(left: &Self, right: &Self) {
left.db
.instance()
.create(
uuid_to_bytes(right.id),
vec![],
vec![],
"".to_string(),
0,
Utc::now().into(),
Utc::now().into(),
vec![],
)
.exec()
.await
.unwrap();
right
.db
.instance()
.create(
uuid_to_bytes(left.id),
vec![],
vec![],
"".to_string(),
0,
Utc::now().into(),
Utc::now().into(),
vec![],
)
.exec()
.await
.unwrap();
left.sync.register_instance(right.id).await;
right.sync.register_instance(left.id).await;
}
}
#[tokio::test]
async fn bruh() -> Result<(), Box<dyn std::error::Error>> {
let (instance1, mut sync_rx1, _) = Instance::new(Uuid::new_v4()).await;
let (instance2, _, mut ingest_rx2) = Instance::new(Uuid::new_v4()).await;
Instance::pair(&instance1, &instance2).await;
tokio::spawn({
let _instance1 = instance1.clone();
let instance2 = instance2.clone();
async move {
while let Ok(msg) = sync_rx1.recv().await {
match msg {
SyncMessage::Created => instance2
.sync
.ingest
.events
.send(ingest::Event::Notification(ingest::NotificationEvent {
tunnel: todo!(),
}))
.await
.unwrap(),
_ => {}
}
}
}
});
tokio::spawn({
let instance1 = instance1.clone();
let instance2 = instance2.clone();
async move {
while let Some(msg) = ingest_rx2.recv().await {
match msg {
ingest::Request::Messages { timestamps, .. } => {
let messages = instance1
.sync
.get_ops(GetOpsArgs {
clocks: timestamps,
count: 100,
})
.await
.unwrap();
instance2
.sync
.ingest
.events
.send(ingest::Event::Messages(ingest::MessagesEvent {
messages,
instance_id: instance1.id,
}))
.await
.unwrap();
}
_ => {}
}
}
}
});
instance1
.sync
.write_ops(&instance1.db, {
let id = Uuid::new_v4();
use prisma::location;
macro_rules! item {
($name:ident, $value:expr) => {
(
(location::$name::NAME, json!($value)),
location::$name::set(Some($value.to_string())),
)
};
}
let (sync_ops, db_ops): (Vec<_>, Vec<_>) = [
item!(name, "Location 0"),
item!(path, "/User/Brendan/Documents"),
]
.into_iter()
.unzip();
(
instance1.sync.shared_create(
prisma_sync::location::SyncId {
pub_id: uuid_to_bytes(id),
},
sync_ops,
),
instance1.db.location().create(uuid_to_bytes(id), db_ops),
)
})
.await?;
tokio::time::sleep(Duration::from_millis(10)).await;
// assert_eq!(out.len(), 3);
// assert!(matches!(out[0].typ, CRDTOperationType::Shared(_)));
instance1.teardown().await;
instance2.teardown().await;
Ok(())
}

View file

@ -4,15 +4,15 @@ datasource db {
}
generator client {
provider = "cargo prisma"
output = "../../crates/prisma/src/prisma"
module_path = "sd_prisma::prisma"
provider = "cargo prisma"
output = "../../crates/prisma/src/prisma"
module_path = "sd_prisma::prisma"
client_format = "folder"
}
generator sync {
provider = "cargo prisma-sync"
output = "../../crates/prisma/src/prisma_sync"
provider = "cargo prisma-sync"
output = "../../crates/prisma/src/prisma_sync"
client_format = "folder"
}
@ -41,11 +41,11 @@ model RelationOperation {
timestamp BigInt
relation String
item_id Bytes
item_id Bytes
group_id Bytes
kind String
data Bytes
kind String
data Bytes
instance_id Int
instance Instance @relation(fields: [instance_id], references: [id])
@ -73,7 +73,7 @@ model Node {
model Instance {
id Int @id @default(autoincrement()) // This is is NOT globally unique
pub_id Bytes @unique // This UUID is meaningless and exists soley cause the `uhlc::ID` must be 16-bit. Really this should be derived from the `identity` field.
// Enum: sd_p2p::identity::Identity
// Enum: sd_core::p2p::IdentityOrRemoteIdentity
identity Bytes
node_id Bytes
@ -84,12 +84,12 @@ model Instance {
last_seen DateTime // Time core started for owner, last P2P message for P2P node
date_created DateTime
// clock timestamp for sync
// clock timestamp for sync
timestamp BigInt?
// attestation Bytes
SharedOperation SharedOperation[]
SharedOperation SharedOperation[]
RelationOperation RelationOperation[]
@@map("instance")
@ -163,7 +163,7 @@ model FilePath {
// location that owns this path
location_id Int?
location Location? @relation(fields: [location_id], references: [id], onDelete: Cascade, onUpdate: Cascade)
location Location? @relation(fields: [location_id], references: [id], onDelete: SetNull)
// the path of the file relative to its location
materialized_path String?
@ -180,7 +180,7 @@ model FilePath {
// the unique Object for this file path
object_id Int?
object Object? @relation(fields: [object_id], references: [id], onDelete: Restrict)
object Object? @relation(fields: [object_id], references: [id], onDelete: SetNull)
key_id Int? // replacement for encryption
// permissions String?
@ -305,7 +305,7 @@ model MediaData {
codecs String? // eg: "h264,acc"
streams Int?
object Object? @relation(fields: [id], references: [id], onDelete: Cascade, onUpdate: Cascade)
object Object? @relation(fields: [id], references: [id], onDelete: Cascade)
@@map("media_data")
}
@ -333,10 +333,10 @@ model Tag {
/// @relation(item: tag, group: object)
model TagOnObject {
tag_id Int
tag Tag @relation(fields: [tag_id], references: [id], onDelete: NoAction, onUpdate: NoAction)
tag Tag @relation(fields: [tag_id], references: [id], onDelete: Restrict)
object_id Int
object Object @relation(fields: [object_id], references: [id], onDelete: NoAction, onUpdate: NoAction)
object Object @relation(fields: [object_id], references: [id], onDelete: Restrict)
@@id([tag_id, object_id])
@@map("tag_on_object")
@ -360,10 +360,10 @@ model LabelOnObject {
date_created DateTime @default(now())
label_id Int
label Label @relation(fields: [label_id], references: [id], onDelete: NoAction, onUpdate: NoAction)
label Label @relation(fields: [label_id], references: [id], onDelete: Restrict)
object_id Int
object Object @relation(fields: [object_id], references: [id], onDelete: NoAction, onUpdate: NoAction)
object Object @relation(fields: [object_id], references: [id], onDelete: Restrict)
@@id([label_id, object_id])
@@map("label_on_object")
@ -386,10 +386,10 @@ model Space {
model ObjectInSpace {
space_id Int
space Space @relation(fields: [space_id], references: [id], onDelete: NoAction, onUpdate: NoAction)
space Space @relation(fields: [space_id], references: [id], onDelete: Restrict)
object_id Int
object Object @relation(fields: [object_id], references: [id], onDelete: NoAction, onUpdate: NoAction)
object Object @relation(fields: [object_id], references: [id], onDelete: Restrict)
@@id([space_id, object_id])
@@map("object_in_space")
@ -422,7 +422,7 @@ model Job {
date_started DateTime? // Started execution
date_completed DateTime? // Finished execution
parent Job? @relation("jobs_dependency", fields: [parent_id], references: [id], onDelete: Cascade, onUpdate: Cascade)
parent Job? @relation("jobs_dependency", fields: [parent_id], references: [id], onDelete: SetNull)
children Job[] @relation("jobs_dependency")
@@map("job")
@ -448,10 +448,10 @@ model Job {
// date_created DateTime @default(now())
// album_id Int
// album Album @relation(fields: [album_id], references: [id], onDelete: NoAction, onUpdate: NoAction)
// album Album @relation(fields: [album_id], references: [id], onDelete: SetNull)
// object_id Int
// object Object @relation(fields: [object_id], references: [id], onDelete: NoAction, onUpdate: NoAction)
// object Object @relation(fields: [object_id], references: [id], onDelete: SetNull)
// @@id([album_id, object_id])
// @@map("object_in_album")
@ -490,10 +490,10 @@ model IndexerRule {
model IndexerRulesInLocation {
location_id Int
location Location @relation(fields: [location_id], references: [id], onDelete: NoAction, onUpdate: NoAction)
location Location @relation(fields: [location_id], references: [id], onDelete: Restrict)
indexer_rule_id Int
indexer_rule IndexerRule @relation(fields: [indexer_rule_id], references: [id], onDelete: NoAction, onUpdate: NoAction)
indexer_rule IndexerRule @relation(fields: [indexer_rule_id], references: [id], onDelete: Restrict)
@@id([location_id, indexer_rule_id])
@@map("indexer_rule_in_location")

View file

@ -1,4 +1,8 @@
use crate::{library::LibraryName, util::MaybeUndefined, volume::get_volumes};
use crate::{
library::{LibraryConfigWrapped, LibraryName},
util::MaybeUndefined,
volume::get_volumes,
};
use chrono::Utc;
use rspc::alpha::AlphaRouter;
@ -89,14 +93,17 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
R.mutation(|ctx, args: CreateLibraryArgs| async move {
debug!("Creating library");
let new_library = ctx
let library = ctx
.library_manager
.create(args.name, None, ctx.config.get().await)
.await?;
debug!("Created library {}", new_library.uuid);
debug!("Created library {}", library.id);
Ok(new_library)
Ok(LibraryConfigWrapped {
uuid: library.id,
config: library.config.clone(),
})
})
})
.procedure("edit", {

View file

@ -1,5 +1,5 @@
use rspc::alpha::AlphaRouter;
use sd_core_sync::SyncMessage;
use sd_core_sync::GetOpsArgs;
use super::{utils::library, Ctx, R};
@ -11,17 +11,24 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
async_stream::stream! {
let mut rx = library.sync.tx.subscribe();
while let Ok(msg) = rx.recv().await {
let op = match msg {
SyncMessage::Ingested(op) => op,
SyncMessage::Created(op) => op
};
yield op;
// let op = match msg {
// SyncMessage::Ingested => (),
// SyncMessage::Created => op
// };
yield ();
}
}
})
})
.procedure("messages", {
R.with2(library())
.query(|(_, library), _: ()| async move { Ok(library.sync.get_ops().await?) })
R.with2(library()).query(|(_, library), _: ()| async move {
Ok(library
.sync
.get_ops(GetOpsArgs {
clocks: vec![],
count: 1000,
})
.await?)
})
})
}

View file

@ -6,7 +6,7 @@ use crate::{
library::LibraryManager,
location::{LocationManager, LocationManagerError},
node::NodeConfigManager,
p2p::P2PManager,
p2p::{sync::NetworkedLibraryManager, P2PManager},
};
use api::notifications::{Notification, NotificationData, NotificationId};
@ -49,9 +49,10 @@ pub struct NodeServices {
pub config: Arc<NodeConfigManager>,
pub job_manager: Arc<JobManager>,
pub location_manager: LocationManager,
pub p2p: P2PManager,
pub p2p: Arc<P2PManager>,
pub event_bus: (broadcast::Sender<CoreEvent>, broadcast::Receiver<CoreEvent>),
pub notifications: NotificationManager,
pub nlm: Arc<NetworkedLibraryManager>,
}
/// Represents a single running instance of the Spacedrive core.
@ -89,19 +90,18 @@ impl Node {
let (p2p, p2p_stream) = P2PManager::new(config.clone()).await?;
let services = Arc::new(NodeServices {
config,
job_manager: JobManager::new(),
location_manager: LocationManager::new(),
p2p,
event_bus: event_bus,
nlm: NetworkedLibraryManager::new(p2p.clone()),
notifications: NotificationManager::new(),
p2p,
config,
event_bus,
});
let library_manager = LibraryManager::new(data_dir.join("libraries"), services).await?;
let node = Arc::new(Node {
data_dir: data_dir.to_path_buf(),
library_manager,
library_manager: LibraryManager::new(data_dir.join("libraries"), services).await?,
});
#[cfg(debug_assertions)]
@ -111,7 +111,8 @@ impl Node {
.await?;
}
node.p2p.start(p2p_stream, node.clone());
node.p2p
.start(p2p_stream, node.library_manager.clone(), node.nlm.clone());
let router = api::mount();
info!("Spacedrive online.");

View file

@ -1,5 +1,6 @@
use crate::{
node::{NodeConfig, Platform},
p2p::IdentityOrRemoteIdentity,
prisma::{file_path, indexer_rule, PrismaClient},
util::{
db::maybe_missing,
@ -35,7 +36,7 @@ pub struct LibraryConfig {
#[async_trait::async_trait]
impl Migrate for LibraryConfig {
const CURRENT_VERSION: u32 = 8;
const CURRENT_VERSION: u32 = 9;
type Ctx = (NodeConfig, Arc<PrismaClient>);
@ -233,6 +234,30 @@ impl Migrate for LibraryConfig {
config.remove("instance_id");
config.insert("instance_id".into(), Value::Number(instance.id.into()));
}
9 => {
db._batch(
db.instance()
.find_many(vec![])
.exec()
.await?
.into_iter()
.map(|i| {
db.instance().update(
instance::id::equals(i.id),
vec![instance::identity::set(
// This code is assuming you only have the current node.
// If you've paired your node with another node, reset your db.
IdentityOrRemoteIdentity::Identity(
Identity::from_bytes(&i.identity).unwrap(),
)
.to_bytes(),
)],
)
})
.collect::<Vec<_>>(),
)
.await?;
}
v => unreachable!("Missing migration for library version {}", v),
}

View file

@ -8,10 +8,7 @@ use crate::{
LocationManager,
},
node::NodeConfigManager,
object::{
orphan_remover::OrphanRemoverActor, preview::get_thumbnail_path,
thumbnail_remover::ThumbnailRemoverActorProxy,
},
object::{orphan_remover::OrphanRemoverActor, preview::get_thumbnail_path},
prisma::{file_path, location, PrismaClient},
util::{db::maybe_missing, error::FileIOError},
NodeServices,
@ -40,9 +37,10 @@ pub struct Library {
pub id: Uuid,
/// config holds the configuration of the current library.
pub config: LibraryConfig,
pub manager: Arc<LibraryManager>,
/// db holds the database client for the current library.
pub db: Arc<PrismaClient>,
pub sync: sd_core_sync::SyncManager,
pub sync: Arc<sd_core_sync::SyncManager>,
/// key manager that provides encryption keys to functions that require them
// pub key_manager: Arc<KeyManager>,
/// holds the node context for the node which this library is running on.
@ -50,7 +48,6 @@ pub struct Library {
/// p2p identity
pub identity: Arc<Identity>,
pub orphan_remover: OrphanRemoverActor,
pub thumbnail_remover_proxy: ThumbnailRemoverActorProxy,
}
impl Debug for Library {
@ -66,40 +63,65 @@ impl Debug for Library {
}
impl Library {
pub fn new(
pub async fn new(
id: Uuid,
instance_id: Uuid,
config: LibraryConfig,
identity: Arc<Identity>,
db: Arc<PrismaClient>,
library_manager: Arc<LibraryManager>,
// node_context: Arc<NodeContext>,
) -> Self {
let (sync_manager, mut sync_rx) = SyncManager::new(&db, instance_id);
let node_context = library_manager.node.clone();
manager: Arc<LibraryManager>,
) -> Arc<Self> {
let mut sync = SyncManager::new(&db, instance_id);
let library = Self {
orphan_remover: OrphanRemoverActor::spawn(db.clone()),
thumbnail_remover_proxy: library_manager.thumbnail_remover_proxy(),
let library = Arc::new(Self {
id,
db,
config,
node: node_context,
manager: manager.clone(),
db: db.clone(),
sync: Arc::new(sync.manager),
node: manager.node.clone(),
// key_manager,
sync: sync_manager,
identity: identity.clone(),
};
orphan_remover: OrphanRemoverActor::spawn(db),
});
manager.node.nlm.load_library(&library).await;
tokio::spawn({
async move {
while let Ok(op) = sync_rx.recv().await {
let SyncMessage::Created(op) = op else { continue; };
let library = library.clone();
library_manager
.node
.p2p
.broadcast_sync_events(id, &identity, vec![op], &library_manager)
.await;
async move {
loop {
tokio::select! {
req = sync.ingest_rx.recv() => {
use sd_core_sync::ingest::Request;
let Some(req) = req else { continue; };
match req {
Request::Messages { tunnel, timestamps } => {
manager.node.nlm.request_and_ingest_ops(
tunnel,
sd_core_sync::GetOpsArgs { clocks: timestamps, count: 100 },
&library.sync,
library.id
).await;
},
Request::Ingest(ops) => {
for op in ops.into_iter() {
library.sync.receive_crdt_operation(op).await;
}
}
}
},
msg = sync.rx.recv() => {
if let Ok(op) = msg {
let SyncMessage::Created = op else { continue; };
manager.node.nlm.alert_new_ops(id, &library.sync).await;
}
},
}
}
}
});

View file

@ -2,11 +2,8 @@ use crate::{
invalidate_query,
location::{indexer, LocationManagerError},
node::{NodeConfig, Platform},
object::{
preview::get_thumbnails_directory,
tag,
thumbnail_remover::{ThumbnailRemoverActor, ThumbnailRemoverActorProxy},
},
object::{preview::get_thumbnails_directory, tag, thumbnail_remover::ThumbnailRemoverActor},
p2p::{IdentityOrRemoteIdentity, IdentityOrRemoteIdentityErr},
prisma::location,
util::{
db::{self, MissingFieldError},
@ -24,7 +21,7 @@ use std::{
};
use chrono::Utc;
use sd_p2p::spacetunnel::{Identity, IdentityErr};
use sd_p2p::spacetunnel::Identity;
use sd_prisma::prisma::instance;
use thiserror::Error;
use tokio::{fs, io, sync::RwLock, try_join};
@ -42,7 +39,7 @@ pub struct LibraryManager {
/// holds the context for the node which this library manager is running on.
pub node: Arc<NodeServices>,
/// An actor that removes stale thumbnails from the file system
thumbnail_remover: ThumbnailRemoverActor,
pub thumbnail_remover: ThumbnailRemoverActor,
}
#[derive(Error, Debug)]
@ -74,7 +71,9 @@ pub enum LibraryManagerError {
#[error("failed to watch locations: {0}")]
LocationWatcher(#[from] LocationManagerError),
#[error("failed to parse library p2p identity: {0}")]
Identity(#[from] IdentityErr),
Identity(#[from] IdentityOrRemoteIdentityErr),
#[error("failed to load private key for instance p2p identity")]
InvalidIdentity,
#[error("current instance with id '{0}' was not found in the database")]
CurrentInstanceNotFound(String),
#[error("missing-field: {0}")]
@ -160,18 +159,14 @@ impl LibraryManager {
Ok(this)
}
pub fn thumbnail_remover_proxy(&self) -> ThumbnailRemoverActorProxy {
self.thumbnail_remover.proxy()
}
/// create creates a new library with the given config and mounts it into the running [LibraryManager].
pub(crate) async fn create(
self: &Arc<Self>,
name: LibraryName,
description: Option<String>,
node_cfg: NodeConfig,
) -> Result<LibraryConfigWrapped, LibraryManagerError> {
self.create_with_uuid(Uuid::new_v4(), name, description, node_cfg, true)
) -> Result<Arc<Library>, LibraryManagerError> {
self.create_with_uuid(Uuid::new_v4(), name, description, node_cfg, true, None)
.await
}
@ -182,7 +177,9 @@ impl LibraryManager {
description: Option<String>,
node_cfg: NodeConfig,
should_seed: bool,
) -> Result<LibraryConfigWrapped, LibraryManagerError> {
// `None` will fallback to default as library must be created with at least one instance
instance: Option<instance::Create>,
) -> Result<Arc<Library>, LibraryManagerError> {
if name.as_ref().is_empty() || name.as_ref().chars().all(|x| x.is_whitespace()) {
return Err(LibraryManagerError::InvalidConfig(
"name cannot be empty".to_string(),
@ -192,7 +189,8 @@ impl LibraryManager {
let config = LibraryConfig {
name,
description,
instance_id: 0, // First instance will always be zero
// First instance will be zero
instance_id: 0,
};
let config_path = self.libraries_dir.join(format!("{id}.sdlibrary"));
@ -210,16 +208,20 @@ impl LibraryManager {
id,
self.libraries_dir.join(format!("{id}.db")),
config_path,
Some(instance::Create {
pub_id: Uuid::new_v4().as_bytes().to_vec(),
identity: Identity::new().to_bytes(),
node_id: node_cfg.id.as_bytes().to_vec(),
node_name: node_cfg.name.clone(),
node_platform: Platform::current() as i32,
last_seen: now,
date_created: now,
// timestamp: Default::default(), // TODO: Source this properly!
_params: vec![instance::id::set(config.instance_id)],
Some({
let mut create = instance.unwrap_or_else(|| instance::Create {
pub_id: Uuid::new_v4().as_bytes().to_vec(),
identity: IdentityOrRemoteIdentity::Identity(Identity::new()).to_bytes(),
node_id: node_cfg.id.as_bytes().to_vec(),
node_name: node_cfg.name.clone(),
node_platform: Platform::current() as i32,
last_seen: now,
date_created: now,
// timestamp: Default::default(), // TODO: Source this properly!
_params: vec![],
});
create._params.push(instance::id::set(config.instance_id));
create
}),
should_seed,
)
@ -235,7 +237,7 @@ impl LibraryManager {
invalidate_query!(library, "library.list");
Ok(LibraryConfigWrapped { uuid: id, config })
Ok(library)
}
pub(crate) async fn get_all_libraries(&self) -> Vec<Arc<Library>> {
@ -248,16 +250,12 @@ impl LibraryManager {
.await
.iter()
.map(|lib| LibraryConfigWrapped {
config: lib.config.clone(),
uuid: lib.id,
config: lib.config.clone(),
})
.collect()
}
// pub(crate) async fn get_all_instances(&self) -> Vec<instance::Data> {
// vec![] // TODO: Cache in memory
// }
pub(crate) async fn edit(
&self,
id: Uuid,
@ -284,6 +282,8 @@ impl LibraryManager {
LibraryConfig::save(&config, &self.libraries_dir.join(format!("{id}.sdlibrary")))?;
self.node.nlm.edit_library(&library).await;
invalidate_query!(library, "library.list");
for library in libraries.iter() {
@ -323,10 +323,12 @@ impl LibraryManager {
.position(|l| l.id == id)
.ok_or(LibraryManagerError::LibraryNotFound)?;
let library_id = libraries_write_guard[library_idx].id;
let library = &*libraries_write_guard[library_idx];
let db_path = self.libraries_dir.join(format!("{}.db", library_id));
let sd_lib_path = self.libraries_dir.join(format!("{}.sdlibrary", library_id));
self.node.nlm.delete_library(library).await;
let db_path = self.libraries_dir.join(format!("{}.db", library.id));
let sd_lib_path = self.libraries_dir.join(format!("{}.sdlibrary", library.id));
try_join!(
async {
@ -346,7 +348,7 @@ impl LibraryManager {
// We only remove here after files deletion
let library = libraries_write_guard.remove(library_idx);
info!("Removed Library <id='{library_id}'>");
info!("Removed Library <id='{}'>", library.id);
invalidate_query!(library, "library.list");
@ -398,7 +400,14 @@ impl LibraryManager {
.ok_or_else(|| {
LibraryManagerError::CurrentInstanceNotFound(config.instance_id.to_string())
})?;
let identity = Arc::new(Identity::from_bytes(&instance.identity)?);
let identity = Arc::new(
match IdentityOrRemoteIdentity::from_bytes(&instance.identity)? {
IdentityOrRemoteIdentity::Identity(identity) => identity,
IdentityOrRemoteIdentity::RemoteIdentity(_) => {
return Err(LibraryManagerError::InvalidIdentity)
}
},
);
let instance_id = Uuid::from_slice(&instance.pub_id)?;
let curr_platform = Platform::current() as i32;
@ -430,7 +439,7 @@ impl LibraryManager {
// let key_manager = Arc::new(KeyManager::new(vec![]).await?);
// seed_keymanager(&db, &key_manager).await?;
let library = Arc::new(Library::new(
let library = Library::new(
id,
instance_id,
config,
@ -438,7 +447,8 @@ impl LibraryManager {
// key_manager,
db,
self.clone(),
));
)
.await;
self.thumbnail_remover.new_library(&library).await;
self.libraries.write().await.push(Arc::clone(&library));

View file

@ -199,7 +199,8 @@ impl StatefulJob for IndexerJobInit {
let to_remove = to_remove.collect::<Vec<_>>();
ctx.library
.thumbnail_remover_proxy
.manager
.thumbnail_remover
.remove_cas_ids(
to_remove
.iter()

View file

@ -81,7 +81,8 @@ pub async fn shallow(
};
library
.thumbnail_remover_proxy
.manager
.thumbnail_remover
.remove_cas_ids(
to_remove
.iter()

View file

@ -41,31 +41,6 @@ enum ThumbnailRemoverActorError {
NonUtf8Path(#[from] NonUtf8PathError),
}
#[derive(Clone)]
pub struct ThumbnailRemoverActorProxy {
cas_ids_to_delete_tx: chan::Sender<Vec<String>>,
non_indexed_thumbnails_cas_ids_tx: chan::Sender<String>,
}
impl ThumbnailRemoverActorProxy {
pub async fn new_non_indexed_thumbnail(&self, cas_id: String) {
if self
.non_indexed_thumbnails_cas_ids_tx
.send(cas_id)
.await
.is_err()
{
error!("Thumbnail remover actor is dead");
}
}
pub async fn remove_cas_ids(&self, cas_ids: Vec<String>) {
if self.cas_ids_to_delete_tx.send(cas_ids).await.is_err() {
error!("Thumbnail remover actor is dead");
}
}
}
#[derive(Debug)]
enum DatabaseMessage {
Add(Uuid, Arc<PrismaClient>),
@ -123,7 +98,7 @@ impl ThumbnailRemoverActor {
pub async fn new_library(&self, Library { id, db, .. }: &Library) {
if self
.databases_tx
.send(DatabaseMessage::Add(*id, Arc::clone(db)))
.send(DatabaseMessage::Add(*id, db.clone()))
.await
.is_err()
{
@ -142,13 +117,6 @@ impl ThumbnailRemoverActor {
}
}
pub fn proxy(&self) -> ThumbnailRemoverActorProxy {
ThumbnailRemoverActorProxy {
cas_ids_to_delete_tx: self.cas_ids_to_delete_tx.clone(),
non_indexed_thumbnails_cas_ids_tx: self.non_indexed_thumbnails_cas_ids_tx.clone(),
}
}
async fn worker(
thumbnails_directory: PathBuf,
databases_rx: chan::Receiver<DatabaseMessage>,
@ -381,4 +349,21 @@ impl ThumbnailRemoverActor {
Ok(())
}
pub async fn new_non_indexed_thumbnail(&self, cas_id: String) {
if self
.non_indexed_thumbnails_cas_ids_tx
.send(cas_id)
.await
.is_err()
{
error!("Thumbnail remover actor is dead");
}
}
pub async fn remove_cas_ids(&self, cas_ids: Vec<String>) {
if self.cas_ids_to_delete_tx.send(cas_ids).await.is_err() {
error!("Thumbnail remover actor is dead");
}
}
}

View file

@ -0,0 +1,50 @@
use sd_p2p::spacetunnel::{Identity, IdentityErr, RemoteIdentity};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum IdentityOrRemoteIdentityErr {
#[error("IdentityErr({0})")]
IdentityErr(#[from] IdentityErr),
#[error("InvalidFormat")]
InvalidFormat,
}
/// TODO
#[derive(Debug, PartialEq)]
pub enum IdentityOrRemoteIdentity {
Identity(Identity),
RemoteIdentity(RemoteIdentity),
}
impl IdentityOrRemoteIdentity {
pub fn remote_identity(&self) -> RemoteIdentity {
match self {
Self::Identity(identity) => identity.to_remote_identity(),
Self::RemoteIdentity(identity) => {
RemoteIdentity::from_bytes(identity.to_bytes().as_slice()).expect("unreachable")
}
}
}
}
impl IdentityOrRemoteIdentity {
pub fn from_bytes(bytes: &[u8]) -> Result<Self, IdentityOrRemoteIdentityErr> {
match bytes[0] {
b'I' => Ok(Self::Identity(Identity::from_bytes(&bytes[1..])?)),
b'R' => Ok(Self::RemoteIdentity(RemoteIdentity::from_bytes(
&bytes[1..],
)?)),
_ => Err(IdentityOrRemoteIdentityErr::InvalidFormat),
}
}
pub fn to_bytes(&self) -> Vec<u8> {
match self {
Self::Identity(identity) => vec![&[b'I'], &*identity.to_bytes()].concat(),
Self::RemoteIdentity(identity) => {
vec![[b'R'].as_slice(), &identity.to_bytes()].concat()
}
}
}
}

View file

@ -1,11 +1,14 @@
#![allow(clippy::unwrap_used, clippy::panic)] // TODO: Remove once this is fully stablised
#![allow(dead_code)] // TODO: Remove once protocol is finished
mod identity_or_remote_identity;
mod p2p_manager;
mod pairing;
mod peer_metadata;
mod protocol;
pub mod sync;
pub use identity_or_remote_identity::*;
pub use p2p_manager::*;
pub use pairing::*;
pub use peer_metadata::*;

View file

@ -9,29 +9,30 @@ use std::{
use futures::Stream;
use sd_p2p::{
spaceblock::{BlockSize, SpaceblockRequest, Transfer},
spacetunnel::{Identity, Tunnel},
spacetunnel::{RemoteIdentity, Tunnel},
Event, Manager, ManagerError, ManagerStream, MetadataManager, PeerId,
};
use sd_sync::CRDTOperation;
use serde::Serialize;
use specta::Type;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt, BufReader},
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader},
sync::{broadcast, oneshot, Mutex},
time::sleep,
};
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info};
use uuid::Uuid;
use crate::{
library::LibraryManager,
node::{NodeConfig, NodeConfigManager},
p2p::{OperatingSystem, SPACEDRIVE_APP_ID},
Node,
};
use super::{Header, PairingManager, PairingStatus, PeerMetadata};
use super::{
sync::{NetworkedLibraryManager, SyncMessage},
Header, PairingManager, PairingStatus, PeerMetadata,
};
/// The amount of time to wait for a Spacedrop request to be accepted or rejected before it's automatically rejected
const SPACEDROP_TIMEOUT: Duration = Duration::from_secs(60);
@ -69,20 +70,21 @@ pub struct P2PManager {
pub metadata_manager: Arc<MetadataManager<PeerMetadata>>,
pub spacedrop_progress: Arc<Mutex<HashMap<Uuid, broadcast::Sender<u8>>>>,
pub pairing: Arc<PairingManager>,
node_config_manager: Arc<NodeConfigManager>,
}
impl P2PManager {
pub async fn new(
node_config: Arc<NodeConfigManager>,
) -> Result<(P2PManager, ManagerStream<PeerMetadata>), ManagerError> {
) -> Result<(Arc<P2PManager>, ManagerStream<PeerMetadata>), ManagerError> {
let (config, keypair) = {
let config = node_config.get().await;
(
Self::config_to_metadata(&config /* , &library_manager */).await,
config.keypair,
)
// TODO: The `vec![]` here is problematic but will be fixed with delayed `MetadataManager`
(Self::config_to_metadata(&config, vec![]), config.keypair)
};
// TODO: Delay building this until the libraries are loaded
let metadata_manager = MetadataManager::new(config);
let (manager, stream) =
@ -100,37 +102,35 @@ impl P2PManager {
let spacedrop_pairing_reqs = Arc::new(Mutex::new(HashMap::new()));
let spacedrop_progress = Arc::new(Mutex::new(HashMap::new()));
let pairing = PairingManager::new(manager.clone(), tx.clone());
let pairing = PairingManager::new(manager.clone(), tx.clone(), metadata_manager.clone());
// TODO: proper shutdown
// https://docs.rs/ctrlc/latest/ctrlc/
// https://docs.rs/system_shutdown/latest/system_shutdown/
Ok((
Self {
Arc::new(Self {
pairing,
events: (tx, rx),
manager,
spacedrop_pairing_reqs,
metadata_manager,
spacedrop_progress,
},
node_config_manager: node_config,
}),
stream,
))
}
pub fn start(&self, mut stream: ManagerStream<PeerMetadata>, node: Arc<Node>) {
// TODO: Probs remove this once connection timeout/keepalive are working correctly
pub fn start(
&self,
mut stream: ManagerStream<PeerMetadata>,
library_manager: Arc<LibraryManager>,
nlm: Arc<NetworkedLibraryManager>,
) {
tokio::spawn({
let manager = self.manager.clone();
async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
manager.broadcast(Header::Ping.to_bytes()).await;
}
}
});
tokio::spawn({
let metadata_manager = self.metadata_manager.clone();
let events = self.events.0.clone();
let spacedrop_pairing_reqs = self.spacedrop_pairing_reqs.clone();
let spacedrop_progress = self.spacedrop_progress.clone();
@ -154,16 +154,39 @@ impl P2PManager {
.map_err(|_| error!("Failed to send event to p2p event stream!"))
.ok();
// TODO: Don't just connect to everyone when we find them. We should only do it if we know them.
// TODO(Spacedrop): Disable Spacedrop for now
// event.dial().await;
nlm.peer_discovered(event).await;
}
Event::PeerExpired { id, metadata } => {
debug!("Peer '{}' expired with metadata: {:?}", id, metadata);
nlm.peer_expired(id).await;
}
Event::PeerConnected(event) => {
debug!("Peer '{}' connected", event.peer_id);
nlm.peer_connected(event.peer_id).await;
if event.establisher {
let manager = manager.clone();
let nlm = nlm.clone();
let instances = metadata_manager.get().instances;
tokio::spawn(async move {
let mut stream = manager.stream(event.peer_id).await.unwrap();
Self::resync(nlm, &mut stream, event.peer_id, instances).await;
});
}
}
Event::PeerDisconnected(peer_id) => {
debug!("Peer '{}' disconnected", peer_id);
nlm.peer_disconnected(peer_id).await;
}
Event::PeerMessage(event) => {
let events = events.clone();
let metadata_manager = metadata_manager.clone();
let spacedrop_pairing_reqs = spacedrop_pairing_reqs.clone();
let spacedrop_progress = spacedrop_progress.clone();
let pairing = pairing.clone();
let node = node.clone();
let library_manager = library_manager.clone();
let nlm = nlm.clone();
tokio::spawn(async move {
let mut stream = event.stream;
@ -231,38 +254,53 @@ impl P2PManager {
}
Header::Pair => {
pairing
.responder(event.peer_id, stream, &node.library_manager)
.responder(event.peer_id, stream, &library_manager)
.await;
}
Header::Sync(library_id) => {
let mut stream = Tunnel::from_stream(stream).await.unwrap();
// Header -> Tunnel -> SyncMessage
let mut len = [0; 4];
stream.read_exact(&mut len).await.unwrap();
let len = u32::from_le_bytes(len);
let mut tunnel = Tunnel::responder(stream).await.unwrap();
let mut buf = vec![0; len as usize]; // TODO: Designed for easily being able to be DOS the current Node
stream.read_exact(&mut buf).await.unwrap();
let msg =
SyncMessage::from_stream(&mut tunnel).await.unwrap();
let mut buf: &[u8] = &buf;
let operations: Vec<CRDTOperation> =
rmp_serde::from_read(&mut buf).unwrap();
let library =
library_manager.get_library(library_id).await.unwrap();
debug!("ingesting sync events for library '{library_id}': {operations:?}");
dbg!(&msg);
let Some(library) = node.library_manager.get_library(library_id).await else {
warn!("error ingesting sync messages. no library by id '{library_id}' found!");
return;
let ingest = &library.sync.ingest;
match msg {
SyncMessage::NewOperations => {
// The ends up in `NetworkedLibraryManager::request_and_ingest_ops`.
// TODO: Throw tunnel around like this makes it soooo confusing.
ingest.notify(tunnel, event.peer_id).await;
}
SyncMessage::OperationsRequest(id) => {
nlm.exchange_sync_ops(
tunnel,
&event.peer_id,
library_id,
&library.sync,
)
.await;
}
SyncMessage::OperationsRequestResponse(_) => {
todo!("unreachable but add proper error handling")
}
};
for op in operations {
library.sync.apply_op(op).await.unwrap_or_else(|err| {
error!(
"error ingesting operation for library '{}': {err:?}",
library.id
);
});
}
}
Header::Connected(identities) => {
Self::resync_handler(
nlm,
&mut stream,
event.peer_id,
metadata_manager.get().instances,
identities,
)
.await
}
}
});
@ -287,37 +325,63 @@ impl P2PManager {
});
}
async fn config_to_metadata(
config: &NodeConfig,
// library_manager: &LibraryManager,
) -> PeerMetadata {
fn config_to_metadata(config: &NodeConfig, instances: Vec<RemoteIdentity>) -> PeerMetadata {
PeerMetadata {
name: config.name.clone(),
operating_system: Some(OperatingSystem::get_os()),
version: Some(env!("CARGO_PKG_VERSION").to_string()),
email: config.p2p_email.clone(),
img_url: config.p2p_img_url.clone(),
// instances: library_manager
// .get_all_instances()
// .await
// .into_iter()
// .filter_map(|i| {
// Identity::from_bytes(&i.identity)
// .map(|i| hex::encode(i.public_key().to_bytes()))
// .ok()
// })
// .collect(),
instances,
}
}
#[allow(unused)] // TODO: Should probs be using this
pub async fn update_metadata(
&self,
node_config_manager: &NodeConfigManager,
library_manager: &LibraryManager,
// TODO: Remove this & move to `NetworkedLibraryManager`??? or make it private?
pub async fn update_metadata(&self, instances: Vec<RemoteIdentity>) {
self.metadata_manager.update(Self::config_to_metadata(
&self.node_config_manager.get().await,
instances,
));
}
pub async fn resync(
nlm: Arc<NetworkedLibraryManager>,
stream: &mut (impl AsyncRead + AsyncWrite + Unpin),
peer_id: PeerId,
instances: Vec<RemoteIdentity>,
) {
self.metadata_manager
.update(Self::config_to_metadata(&node_config_manager.get().await).await);
// TODO: Make this encrypted using node to node auth so it can't be messed with in transport
stream
.write_all(&Header::Connected(instances).to_bytes())
.await
.unwrap();
let Header::Connected(identities) =
Header::from_stream(stream).await.unwrap() else {
panic!("unreachable but error handling")
};
for identity in identities {
nlm.peer_connected2(identity, peer_id.clone()).await;
}
}
pub async fn resync_handler(
nlm: Arc<NetworkedLibraryManager>,
stream: &mut (impl AsyncRead + AsyncWrite + Unpin),
peer_id: PeerId,
local_identities: Vec<RemoteIdentity>,
remote_identities: Vec<RemoteIdentity>,
) {
for identity in remote_identities {
nlm.peer_connected2(identity, peer_id.clone()).await;
}
stream
.write_all(&Header::Connected(local_identities).to_bytes())
.await
.unwrap();
}
pub async fn accept_spacedrop(&self, id: Uuid, path: String) {
@ -336,62 +400,8 @@ impl P2PManager {
self.events.0.subscribe()
}
pub async fn broadcast_sync_events(
&self,
library_id: Uuid,
_identity: &Identity,
event: Vec<CRDTOperation>,
library_manager: &LibraryManager,
) {
println!("broadcasting sync events!");
let mut buf = match rmp_serde::to_vec_named(&event) {
Ok(buf) => buf,
Err(e) => {
error!("Failed to serialize sync event: {:?}", e);
return;
}
};
let mut head_buf = Header::Sync(library_id).to_bytes(); // Max Sync payload is like 4GB
head_buf.extend_from_slice(&(buf.len() as u32).to_le_bytes());
head_buf.append(&mut buf);
// TODO: Determine which clients we share that library with
// TODO: Establish a connection to them
let _library = library_manager.get_library(library_id).await.unwrap();
todo!();
// TODO: probs cache this query in memory cause this is gonna be stupid frequent
// let target_nodes = library
// .db
// .node()
// .find_many(vec![])
// .exec()
// .await
// .unwrap()
// .into_iter()
// .map(|n| {
// PeerId::from_str(&n.node_peer_id.expect("Node was missing 'node_peer_id'!"))
// .unwrap()
// })
// .collect::<Vec<_>>();
// info!(
// "Sending sync messages for library '{}' to nodes with peer id's '{:?}'",
// library_id, target_nodes
// );
// // TODO: Do in parallel
// for peer_id in target_nodes {
// let stream = self.manager.stream(peer_id).await.map_err(|_| ()).unwrap(); // TODO: handle providing incorrect peer id
// let mut tunnel = Tunnel::from_stream(stream).await.unwrap();
// tunnel.write_all(&head_buf).await.unwrap();
// }
pub async fn ping(&self) {
self.manager.broadcast(Header::Ping.to_bytes()).await;
}
// TODO: Proper error handling

View file

@ -57,6 +57,14 @@ macro_rules! impl_for_models {
Ok(())
}
pub fn model_name(&self) -> &'static str {
match self {
$(
Self::$variant(_) => stringify!($model),
)*
}
}
}
/// This exists to determine the next model to sync.

View file

@ -8,15 +8,16 @@ use std::{
use chrono::Utc;
use futures::channel::oneshot;
use sd_p2p::{spacetunnel::Identity, Manager, PeerId};
use sd_p2p::{spacetunnel::Identity, Manager, MetadataManager, PeerId};
use sd_prisma::prisma::instance;
use serde::{Deserialize, Serialize};
use specta::Type;
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
sync::broadcast,
};
use tracing::{debug, info};
use tracing::{error, info};
use uuid::Uuid;
mod initial_sync;
@ -28,7 +29,7 @@ use proto::*;
use crate::{
library::{LibraryManager, LibraryName},
node::{NodeConfig, Platform},
p2p::Header,
p2p::{Header, IdentityOrRemoteIdentity, P2PManager},
};
use super::{P2PEvent, PeerMetadata};
@ -38,21 +39,21 @@ pub struct PairingManager {
events_tx: broadcast::Sender<P2PEvent>,
pairing_response: RwLock<HashMap<u16, oneshot::Sender<PairingDecision>>>,
manager: Arc<Manager<PeerMetadata>>,
// library_manager: Arc<LibraryManager>,
metadata_manager: Arc<MetadataManager<PeerMetadata>>,
}
impl PairingManager {
pub fn new(
manager: Arc<Manager<PeerMetadata>>,
events_tx: broadcast::Sender<P2PEvent>,
// library_manager: Arc<LibraryManager>,
metadata_manager: Arc<MetadataManager<PeerMetadata>>,
) -> Arc<Self> {
Arc::new(Self {
id: AtomicU16::new(0),
events_tx,
pairing_response: RwLock::new(HashMap::new()),
manager,
// library_manager,
metadata_manager,
})
}
@ -92,9 +93,11 @@ impl PairingManager {
// 1. Create new instance for originator and send it to the responder
self.emit_progress(pairing_id, PairingStatus::PairingRequested);
let now = Utc::now();
let identity = Identity::new();
let self_instance_id = Uuid::new_v4();
let req = PairingRequest(Instance {
id: Uuid::new_v4(),
identity: Identity::new(), // TODO: Public key only
id: self_instance_id,
identity: identity.to_remote_identity(),
node_id: node_config.id,
node_name: node_config.name.clone(),
node_platform: Platform::current(),
@ -138,25 +141,66 @@ impl PairingManager {
return;
}
let library_config = library_manager
let (this, instances): (Vec<_>, Vec<_>) = instances
.into_iter()
.partition(|i| i.id == self_instance_id);
if this.len() != 1 {
todo!("error handling");
}
let this = this.first().expect("unreachable");
if this.identity != identity.to_remote_identity() {
todo!("error handling. Something went really wrong!");
}
let library = library_manager
.create_with_uuid(
library_id,
LibraryName::new(library_name).unwrap(),
library_description,
node_config,
false, // We will sync everything which will conflict with the seeded stuff
Some(instance::Create {
pub_id: this.id.as_bytes().to_vec(),
identity: IdentityOrRemoteIdentity::Identity(identity).to_bytes(),
node_id: this.node_id.as_bytes().to_vec(),
node_name: this.node_name.clone(), // TODO: Remove `clone`
node_platform: this.node_platform as i32,
last_seen: this.last_seen.into(),
date_created: this.date_created.into(),
// timestamp: Default::default(), // TODO: Source this properly!
_params: vec![],
}),
)
.await
.unwrap();
let library = library_manager
.get_library(library_config.uuid)
.await
.unwrap();
let library = library_manager.get_library(library.id).await.unwrap();
library
.db
.instance()
.create_many(instances.into_iter().map(|i| i.into()).collect())
.create_many(
instances
.into_iter()
.map(|i| {
instance::CreateUnchecked {
pub_id: i.id.as_bytes().to_vec(),
identity: IdentityOrRemoteIdentity::RemoteIdentity(
i.identity,
)
.to_bytes(),
node_id: i.node_id.as_bytes().to_vec(),
node_name: i.node_name,
node_platform: i.node_platform as i32,
last_seen: i.last_seen.into(),
date_created: i.date_created.into(),
// timestamp: Default::default(), // TODO: Source this properly!
_params: vec![],
}
})
.collect(),
)
.exec()
.await
.unwrap();
@ -175,7 +219,15 @@ impl PairingManager {
}
synced += data.len();
data.insert(&library.db).await.unwrap();
let model_name = data.model_name();
match data.insert(&library.db).await {
Ok(_) => {}
Err(e) => {
error!("Error inserting '{model_name}' data: {:?}", e);
// TODO: Handle error
}
}
// Prevent divide by zero
if total != 0 {
@ -191,6 +243,17 @@ impl PairingManager {
}
}
// Called again so the new instances are picked up
library_manager.node.nlm.load_library(&library).await;
P2PManager::resync(
library_manager.node.nlm.clone(),
&mut stream,
peer_id,
self.metadata_manager.get().instances,
)
.await;
// TODO: Done message to frontend
self.emit_progress(pairing_id, PairingStatus::PairingComplete(library_id));
stream.flush().await.unwrap();
@ -216,14 +279,13 @@ impl PairingManager {
info!("Beginning pairing '{pairing_id}' as responder to remote peer '{peer_id}'");
// let inner = || async move {
let remote_instance = PairingRequest::from_stream(&mut stream).await.unwrap().0;
self.emit_progress(pairing_id, PairingStatus::PairingDecisionRequest);
self.events_tx
.send(P2PEvent::PairingRequest {
id: pairing_id,
name: remote_instance.node_name,
os: remote_instance.node_platform.into(),
name: remote_instance.node_name.clone(),
os: remote_instance.node_platform.clone().into(),
})
.ok();
@ -243,6 +305,25 @@ impl PairingManager {
info!("The user accepted pairing '{pairing_id}' for library '{library_id}'!");
let library = library_manager.get_library(library_id).await.unwrap();
// TODO: Rollback this on pairing failure
instance::Create {
pub_id: remote_instance.id.as_bytes().to_vec(),
identity: IdentityOrRemoteIdentity::RemoteIdentity(remote_instance.identity.clone())
.to_bytes(),
node_id: remote_instance.node_id.as_bytes().to_vec(),
node_name: remote_instance.node_name,
node_platform: remote_instance.node_platform as i32,
last_seen: remote_instance.last_seen.into(),
date_created: remote_instance.date_created.into(),
// timestamp: Default::default(), // TODO: Source this properly!
_params: vec![],
}
.to_query(&library.db)
.exec()
.await
.unwrap();
stream
.write_all(
&PairingResponse::Accepted {
@ -259,8 +340,9 @@ impl PairingManager {
.into_iter()
.map(|i| Instance {
id: Uuid::from_slice(&i.pub_id).unwrap(),
// TODO: If `i.identity` contains a public/private keypair replace it with the public key
identity: Identity::from_bytes(&i.identity).unwrap(),
identity: IdentityOrRemoteIdentity::from_bytes(&i.identity)
.unwrap()
.remote_identity(),
node_id: Uuid::from_slice(&i.node_id).unwrap(),
node_name: i.node_name,
node_platform: Platform::try_from(i.node_platform as u8)
@ -293,11 +375,11 @@ impl PairingManager {
pairing_id,
PairingStatus::InitialSyncProgress((synced as f32 / total as f32 * 100.0) as u8), // SAFETY: It's a percentage
);
debug!(
"Initial library sync cursor={:?} items={}",
cursor,
data.len()
);
// debug!(
// "Initial library sync cursor={:?} items={}",
// cursor,
// data.len()
// );
stream
.write_all(&SyncData::Data { total_models, data }.to_bytes().unwrap())
@ -310,6 +392,22 @@ impl PairingManager {
.await
.unwrap();
// Called again so the new instances are picked up
library_manager.node.nlm.load_library(&library).await;
let Header::Connected(remote_identities) = Header::from_stream(&mut stream).await.unwrap() else {
todo!("unreachable; todo error handling");
};
P2PManager::resync_handler(
library_manager.node.nlm.clone(),
&mut stream,
peer_id,
self.metadata_manager.get().instances,
remote_identities,
)
.await;
self.emit_progress(pairing_id, PairingStatus::PairingComplete(library_id));
stream.flush().await.unwrap();
}

View file

@ -3,9 +3,8 @@ use std::str::FromStr;
use chrono::{DateTime, Utc};
use sd_p2p::{
proto::{decode, encode},
spacetunnel::Identity,
spacetunnel::RemoteIdentity,
};
use sd_prisma::prisma::*;
use tokio::io::{AsyncRead, AsyncReadExt};
use uuid::Uuid;
@ -22,7 +21,7 @@ use super::ModelData;
#[derive(Debug, PartialEq)]
pub struct Instance {
pub id: Uuid,
pub identity: Identity,
pub identity: RemoteIdentity,
pub node_id: Uuid,
pub node_name: String,
pub node_platform: Platform,
@ -30,22 +29,6 @@ pub struct Instance {
pub date_created: DateTime<Utc>,
}
impl From<Instance> for instance::CreateUnchecked {
fn from(i: Instance) -> Self {
Self {
pub_id: i.id.as_bytes().to_vec(),
identity: i.identity.to_bytes(),
node_id: i.node_id.as_bytes().to_vec(),
node_name: i.node_name,
node_platform: i.node_platform as i32,
last_seen: i.last_seen.into(),
date_created: i.date_created.into(),
// timestamp: Default::default(), // TODO: Source this properly!
_params: vec![],
}
}
}
/// 1. Request for pairing to a library that is owned and will be selected by the responder.
/// Sent `Originator` -> `Responder`.
#[derive(Debug, PartialEq)]
@ -98,7 +81,7 @@ impl Instance {
) -> Result<Self, (&'static str, decode::Error)> {
Ok(Self {
id: decode::uuid(stream).await.map_err(|e| ("id", e))?,
identity: Identity::from_bytes(
identity: RemoteIdentity::from_bytes(
&decode::buf(stream).await.map_err(|e| ("identity", e))?,
)
.unwrap(), // TODO: Error handling
@ -287,13 +270,16 @@ impl SyncData {
#[cfg(test)]
mod tests {
use sd_p2p::spacetunnel::Identity;
use super::*;
#[tokio::test]
async fn test_types() {
let identity = Identity::new();
let instance = || Instance {
id: Uuid::new_v4(),
identity: Identity::new(),
identity: identity.to_remote_identity(),
node_id: Uuid::new_v4(),
node_name: "Node Name".into(),
node_platform: Platform::current(),

View file

@ -1,6 +1,7 @@
use std::{collections::HashMap, env, str::FromStr};
use sd_p2p::Metadata;
use itertools::Itertools;
use sd_p2p::{spacetunnel::RemoteIdentity, Metadata};
use serde::{Deserialize, Serialize};
use specta::Type;
@ -13,7 +14,9 @@ pub struct PeerMetadata {
pub(super) version: Option<String>,
pub(super) email: Option<String>,
pub(super) img_url: Option<String>,
// pub(super) instances: Vec<String>,
// TODO: Max vec length to prevent it being used to spam??
#[serde(skip)]
pub(super) instances: Vec<RemoteIdentity>,
}
impl Metadata for PeerMetadata {
@ -32,7 +35,22 @@ impl Metadata for PeerMetadata {
if let Some(img_url) = self.img_url {
map.insert("img_url".to_owned(), img_url);
}
// map.insert("instances".to_owned(), self.instances.into_iter().join(","));
// This is not pretty but a DNS record has a max of 255 characters so we use multiple records. Be aware the MDNS library adds `i_{i}=` to the start so it counts towards the 255 length.
self.instances
.into_iter()
.map(|i| hex::encode(i.to_bytes()))
.collect::<Vec<_>>()
.join(",")
.chars()
.chunks(249 /* 3 (`i_=`) + 3 (`100`) */)
.into_iter()
.map(|c| c.collect::<String>())
.enumerate()
.for_each(|(i, s)| {
map.insert(format!("i_{}", i), s);
});
map
}
@ -55,15 +73,33 @@ impl Metadata for PeerMetadata {
version: data.get("version").map(|v| v.to_owned()),
email: data.get("email").map(|v| v.to_owned()),
img_url: data.get("img_url").map(|v| v.to_owned()),
// instances: data
// .get("instances")
// .ok_or_else(|| {
// "DNS record for field 'instances' missing. Unable to decode 'PeerMetadata'!"
// .to_owned()
// })?
// .split(',')
// .map(|s| s.parse().map_err(|_| "Unable to parse instance 'Uuid'!"))
// .collect::<Result<Vec<_>, _>>()?,
instances: {
let mut i = 0;
let mut instances = String::new();
loop {
if let Some(s) = data.get(&format!("i_{}", i)) {
instances.push_str(&*s);
i += 1;
} else {
break;
}
}
if instances.is_empty() {
return Err("DNS record for field 'instances' missing. Unable to decode 'PeerMetadata'!"
.to_owned());
}
instances
.split(',')
.map(|s| {
RemoteIdentity::from_bytes(
&hex::decode(s).map_err(|_| "Unable to decode instance!")?,
)
.map_err(|_| "Unable to parse instance!")
})
.collect::<Result<Vec<_>, _>>()?
},
})
}
}

View file

@ -1,11 +1,11 @@
use thiserror::Error;
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncRead, AsyncReadExt};
use uuid::Uuid;
use sd_p2p::{
proto::{decode, encode},
spaceblock::{SpaceblockRequest, SpacedropRequestError},
spacetime::UnicastStream,
spacetunnel::RemoteIdentity,
};
/// TODO
@ -16,6 +16,9 @@ pub enum Header {
Spacedrop(SpaceblockRequest),
Pair,
Sync(Uuid),
// TODO: Remove need for this
Connected(Vec<RemoteIdentity>),
}
#[derive(Debug, Error)]
@ -31,7 +34,7 @@ pub enum HeaderError {
}
impl Header {
pub async fn from_stream(stream: &mut UnicastStream) -> Result<Self, HeaderError> {
pub async fn from_stream(stream: &mut (impl AsyncRead + Unpin)) -> Result<Self, HeaderError> {
let discriminator = stream
.read_u8()
.await
@ -48,6 +51,17 @@ impl Header {
.await
.map_err(HeaderError::SyncRequest)?,
)),
// TODO: Error handling
255 => Ok(Self::Connected({
let len = stream.read_u16_le().await.unwrap();
let mut identities = Vec::with_capacity(len as usize);
for _ in 0..len {
identities.push(
RemoteIdentity::from_bytes(&decode::buf(stream).await.unwrap()).unwrap(),
);
}
identities
})),
d => Err(HeaderError::DiscriminatorInvalid(d)),
}
}
@ -66,6 +80,17 @@ impl Header {
encode::uuid(&mut bytes, uuid);
bytes
}
Self::Connected(remote_identities) => {
let mut bytes = vec![255];
if remote_identities.len() > u16::MAX as usize {
panic!("Buf is too long!"); // TODO: Chunk this so it will never error
}
bytes.extend((remote_identities.len() as u16).to_le_bytes());
for identity in remote_identities {
encode::buf(&mut bytes, &identity.to_bytes());
}
bytes
}
}
}
}

278
core/src/p2p/sync/mod.rs Normal file
View file

@ -0,0 +1,278 @@
use std::{collections::HashMap, sync::Arc};
use futures::future::join_all;
use sd_core_sync::{ingest, GetOpsArgs, SyncManager};
use sd_p2p::{
spacetunnel::{RemoteIdentity, Tunnel},
DiscoveredPeer, PeerId,
};
use tokio::{io::AsyncWriteExt, sync::RwLock};
use tracing::debug;
use uuid::Uuid;
use crate::library::Library;
use super::{Header, IdentityOrRemoteIdentity, P2PManager, PeerMetadata};
mod proto;
pub use proto::*;
pub enum InstanceState {
Unavailable,
Discovered(PeerId),
Connected(PeerId),
}
pub struct LibraryData {
instances: HashMap<RemoteIdentity /* Identity public key */, InstanceState>,
}
pub struct NetworkedLibraryManager {
p2p: Arc<P2PManager>,
libraries: RwLock<HashMap<Uuid /* Library ID */, LibraryData>>,
}
impl NetworkedLibraryManager {
pub fn new(p2p: Arc<P2PManager>) -> Arc<Self> {
Arc::new(Self {
p2p,
libraries: Default::default(),
})
}
pub async fn load_library(&self, library: &Library) {
// TODO: Error handling
let instances = library
.db
.instance()
.find_many(vec![])
.exec()
.await
.unwrap();
let metadata_instances = instances
.iter()
.map(|i| {
IdentityOrRemoteIdentity::from_bytes(&i.identity)
.unwrap()
.remote_identity()
})
.collect();
let mut libraries = self.libraries.write().await;
libraries.insert(
library.id,
LibraryData {
instances: instances
.into_iter()
.filter_map(|i| {
// TODO: Error handling
match IdentityOrRemoteIdentity::from_bytes(&i.identity).unwrap() {
IdentityOrRemoteIdentity::Identity(identity) => {
Some((identity.to_remote_identity(), InstanceState::Unavailable))
}
// We don't own it so don't advertise it
IdentityOrRemoteIdentity::RemoteIdentity(_) => None,
}
})
.collect(),
},
);
self.p2p.update_metadata(metadata_instances).await;
}
pub async fn edit_library(&self, _library: &Library) {
// TODO: Send changes to all connected nodes!
// TODO: Update mdns
}
pub async fn delete_library(&self, library: &Library) {
// TODO: Do proper library delete/unpair procedure.
self.libraries.write().await.remove(&library.id);
// TODO: Update mdns
}
pub async fn peer_discovered(&self, event: DiscoveredPeer<PeerMetadata>) {
for lib in self.libraries.write().await.values_mut() {
if let Some((_pk, instance)) = lib
.instances
.iter_mut()
.find(|(pk, _)| event.metadata.instances.iter().any(|pk2| *pk2 == **pk))
{
if !matches!(instance, InstanceState::Connected(_)) {
let should_connect = matches!(instance, InstanceState::Unavailable);
*instance = InstanceState::Discovered(event.peer_id.clone());
if should_connect {
event.dial().await;
}
}
return; // PK can only exist once so we short circuit
}
}
}
pub async fn peer_expired(&self, id: PeerId) {
for lib in self.libraries.write().await.values_mut() {
for instance in lib.instances.values_mut() {
if let InstanceState::Discovered(peer_id) = instance {
if *peer_id == id {
*instance = InstanceState::Unavailable;
}
}
}
}
}
pub async fn peer_connected(&self, peer_id: PeerId) {
// TODO: This is a very suboptimal way of doing this cause it assumes a discovery message will always come before discover which is false.
// TODO: Hence part of the need for `Self::peer_connected2`
for lib in self.libraries.write().await.values_mut() {
for instance in lib.instances.values_mut() {
if let InstanceState::Discovered(id) = instance {
if *id == peer_id {
*instance = InstanceState::Connected(peer_id.clone());
return; // Will only exist once so we short circuit
}
}
}
}
}
// TODO: Remove need for this cause it's weird
pub async fn peer_connected2(&self, instance_id: RemoteIdentity, peer_id: PeerId) {
for lib in self.libraries.write().await.values_mut() {
if let Some(instance) = lib.instances.get_mut(&instance_id) {
*instance = InstanceState::Connected(peer_id.clone());
return; // Will only exist once so we short circuit
}
}
}
pub async fn peer_disconnected(&self, peer_id: PeerId) {
for lib in self.libraries.write().await.values_mut() {
for instance in lib.instances.values_mut() {
if let InstanceState::Connected(id) = instance {
if *id == peer_id {
*instance = InstanceState::Unavailable;
return; // Will only exist once so we short circuit
}
}
}
}
}
// TODO: Error handling
pub async fn alert_new_ops(&self, library_id: Uuid, sync: &Arc<SyncManager>) {
debug!("NetworkedLibraryManager::alert_new_ops({library_id})");
join_all(
self.libraries
.read()
.await
.get(&library_id)
.unwrap()
.instances
.iter()
.filter_map(|(_, i)| match i {
InstanceState::Connected(peer_id) => Some(peer_id),
_ => None,
})
// TODO: Deduplicate any duplicate peer ids -> This is an edge case but still
.map(|peer_id| {
let p2p = self.p2p.clone();
async move {
debug!("Alerting peer '{peer_id:?}' of new sync events for library '{library_id:?}'");
let mut stream =
p2p.manager.stream(*peer_id).await.map_err(|_| ()).unwrap(); // TODO: handle providing incorrect peer id
stream
.write_all(&Header::Sync(library_id).to_bytes())
.await
.unwrap();
let mut tunnel = Tunnel::initiator(stream).await.unwrap();
tunnel
.write_all(&SyncMessage::NewOperations.to_bytes())
.await
.unwrap();
tunnel.flush().await.unwrap();
let id = match SyncMessage::from_stream(&mut tunnel).await.unwrap() {
SyncMessage::OperationsRequest(resp) => resp,
_ => todo!("unreachable but proper error handling"),
};
self.exchange_sync_ops(tunnel, peer_id, library_id, sync)
.await;
}
}),
)
.await;
}
// Ask the remote for operations and then ingest them
pub async fn request_and_ingest_ops(
&self,
mut tunnel: Tunnel,
args: GetOpsArgs,
sync: &SyncManager,
library_id: Uuid,
) {
tunnel
.write_all(&SyncMessage::OperationsRequest(args).to_bytes())
.await
.unwrap();
tunnel.flush().await.unwrap();
let SyncMessage::OperationsRequestResponse(ops) = SyncMessage::from_stream(&mut tunnel).await.unwrap() else {
todo!("unreachable but proper error handling")
};
// debug!("Received sync events response w/ id '{id}' from peer '{peer_id:?}' for library '{library_id:?}'");
sync.ingest
.events
.send(ingest::Event::Messages(ingest::MessagesEvent {
instance_id: sync.instance,
messages: ops,
}))
.await
.map_err(|_| "TODO: Handle ingest channel closed, so we don't loose ops")
.unwrap();
}
// TODO: Error handling
pub async fn exchange_sync_ops(
&self,
mut tunnel: Tunnel,
peer_id: &PeerId,
library_id: Uuid,
sync: &SyncManager,
) {
let ops = sync
.get_ops(sd_core_sync::GetOpsArgs {
clocks: vec![],
count: 100,
})
.await
.unwrap();
debug!(
"Sending '{}' sync ops from peer '{peer_id:?}' for library '{library_id:?}'",
ops.len()
);
tunnel
.write_all(&SyncMessage::OperationsRequestResponse(ops).to_bytes())
.await
.unwrap();
}
}

111
core/src/p2p/sync/proto.rs Normal file
View file

@ -0,0 +1,111 @@
use sd_core_sync::GetOpsArgs;
use sd_p2p::proto::{decode, encode};
use sd_sync::CRDTOperation;
use tokio::io::{AsyncRead, AsyncReadExt};
#[derive(Debug, PartialEq, Eq)]
pub enum SyncMessage {
NewOperations,
OperationsRequest(GetOpsArgs),
OperationsRequestResponse(Vec<CRDTOperation>),
}
impl SyncMessage {
// TODO: Per field errors for better error handling
pub async fn from_stream(stream: &mut (impl AsyncRead + Unpin)) -> std::io::Result<Self> {
match stream.read_u8().await? {
b'N' => Ok(Self::NewOperations),
b'R' => Ok(Self::OperationsRequest(
rmp_serde::from_slice(&decode::buf(stream).await.unwrap()).unwrap(),
)),
b'P' => Ok(Self::OperationsRequestResponse(
// TODO: Error handling
rmp_serde::from_slice(&decode::buf(stream).await.unwrap()).unwrap(),
)),
header => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Invalid sync message header: {}",
(header as char).to_string()
),
)),
}
}
pub fn to_bytes(&self) -> Vec<u8> {
match self {
Self::NewOperations => vec![b'N'],
Self::OperationsRequest(args) => {
let mut buf = vec![b'R'];
// TODO: Error handling
encode::buf(&mut buf, &rmp_serde::to_vec_named(&args).unwrap());
buf
}
Self::OperationsRequestResponse(ops) => {
let mut buf = vec![b'P'];
// TODO: Error handling
encode::buf(&mut buf, &rmp_serde::to_vec_named(&ops).unwrap());
buf
}
}
}
}
#[cfg(test)]
mod tests {
use sd_core_sync::NTP64;
use sd_sync::SharedOperation;
use serde_json::Value;
use uuid::Uuid;
use super::*;
#[tokio::test]
async fn test_types() {
{
let original = SyncMessage::NewOperations;
let mut cursor = std::io::Cursor::new(original.to_bytes());
let result = SyncMessage::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, result);
}
{
let original = SyncMessage::OperationsRequest(GetOpsArgs {
clocks: vec![],
count: 0,
});
let mut cursor = std::io::Cursor::new(original.to_bytes());
let result = SyncMessage::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, result);
}
{
let original = SyncMessage::OperationsRequestResponse(vec![]);
let mut cursor = std::io::Cursor::new(original.to_bytes());
let result = SyncMessage::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, result);
}
{
let original = SyncMessage::OperationsRequestResponse(vec![CRDTOperation {
instance: Uuid::new_v4(),
timestamp: NTP64(0),
id: Uuid::new_v4(),
typ: sd_sync::CRDTOperationType::Shared(SharedOperation {
record_id: Value::Null,
model: "name".to_string(),
data: sd_sync::SharedOperationData::Create,
}),
}]);
let mut cursor = std::io::Cursor::new(original.to_bytes());
let result = SyncMessage::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, result);
}
}
}

View file

@ -115,10 +115,17 @@ impl InitConfig {
Some(lib) => lib,
None => {
let library = library_manager
.create_with_uuid(lib.id, lib.name, lib.description, node_cfg.clone(), true)
.create_with_uuid(
lib.id,
lib.name,
lib.description,
node_cfg.clone(),
true,
None,
)
.await?;
match library_manager.get_library(library.uuid).await {
match library_manager.get_library(library.id).await {
Some(lib) => lib,
None => {
warn!(

View file

@ -22,7 +22,9 @@ tokio = { workspace = true, features = [
] }
libp2p = { version = "0.51.3", features = ["tokio", "serde"] }
libp2p-quic = { version = "0.7.0-alpha.3", features = ["tokio"] }
if-watch = { version = "3.0.1", features = ["tokio"] } # Override the features of if-watch which is used by libp2p-quic
if-watch = { version = "3.0.1", features = [
"tokio",
] } # Override the features of if-watch which is used by libp2p-quic
mdns-sd = "0.6.1"
thiserror = "1.0.40"
tracing = "0.1.37"
@ -32,10 +34,10 @@ specta = { workspace = true }
flume = "0.10.14"
tokio-util = { version = "0.7.8", features = ["compat"] }
arc-swap = "1.6.0"
p384 = { version = "0.13.0", feature = ["ecdh"] }
ed25519-dalek = { version = "1.0.1", features = ["rand"] }
rand_core = { version = "0.5.1", feature = ["getrandom"] }
uuid = "1.4.0"
hex = "0.4.3"
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread"] }

View file

@ -213,7 +213,7 @@ where
),
}
}
ManagerStreamAction::StartStream(peer_id, rx) => {
ManagerStreamAction::StartStream(peer_id, tx) => {
if !self.swarm.connected_peers().any(|v| *v == peer_id.0) {
let addresses = self
.mdns
@ -242,7 +242,7 @@ where
self.on_establish_streams
.entry(peer_id.0)
.or_default()
.push(OutboundRequest::Unicast(rx));
.push(OutboundRequest::Unicast(tx));
} else {
self.swarm
.behaviour_mut()
@ -250,7 +250,7 @@ where
.push_back(ToSwarm::NotifyHandler {
peer_id: peer_id.0,
handler: NotifyHandler::Any,
event: OutboundRequest::Unicast(rx),
event: OutboundRequest::Unicast(tx),
});
}
}

View file

@ -38,4 +38,6 @@ impl<TMetadata: Metadata> DiscoveredPeer<TMetadata> {
pub struct ConnectedPeer {
/// get the peer id of the discovered peer
pub peer_id: PeerId,
/// Did I open the connection?
pub establisher: bool,
}

View file

@ -42,7 +42,7 @@ pub mod decode {
/// Deserialize buf as it's u16 length and data.
pub async fn buf(stream: &mut (impl AsyncRead + Unpin)) -> Result<Vec<u8>, Error> {
let len = stream.read_u16_le().await?;
let len = stream.read_u32_le().await?;
let mut buf = vec![0u8; len as usize];
stream.read_exact(&mut buf).await?;
@ -61,21 +61,19 @@ pub mod encode {
/// Serialize string as it's u16 length and data.
pub fn string(buf: &mut Vec<u8>, s: &str) {
let len_buf = (s.len() as u16).to_le_bytes();
if s.len() > u16::MAX as usize {
panic!("String is too long!"); // TODO: Error handling
panic!("String is too long!"); // TODO: Chunk this so it will never error
}
buf.extend_from_slice(&len_buf);
buf.extend_from_slice(&(s.len() as u16).to_le_bytes());
buf.extend(s.as_bytes());
}
/// Serialize buf as it's u16 length and data.
pub fn buf(buf: &mut Vec<u8>, b: &[u8]) {
let len_buf = (b.len() as u16).to_le_bytes();
if b.len() > u16::MAX as usize {
panic!("Buf is too long!"); // TODO: Error handling
if b.len() > u32::MAX as usize {
panic!("Buf is too long!"); // TODO: Chunk this so it will never error
}
buf.extend_from_slice(&len_buf);
buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
buf.extend(b);
}
}

View file

@ -118,6 +118,10 @@ impl<TMetadata: Metadata> NetworkBehaviour for SpaceTime<TMetadata> {
self.pending_events.push_back(ToSwarm::GenerateEvent(
ManagerStreamAction::Event(Event::PeerConnected(ConnectedPeer {
peer_id,
establisher: match endpoint {
ConnectedPoint::Dialer { .. } => true,
ConnectedPoint::Listener { .. } => false,
},
})),
));
}

View file

@ -1,3 +1,5 @@
use std::hash::{Hash, Hasher};
use ed25519_dalek::PublicKey;
use rand_core::OsRng;
use thiserror::Error;
@ -35,17 +37,27 @@ impl Identity {
self.0.to_bytes().to_vec()
}
pub fn public_key(&self) -> PublicKey {
self.0.public
}
pub fn to_remote_identity(&self) -> RemoteIdentity {
RemoteIdentity(self.0.public)
}
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq)]
pub struct RemoteIdentity(ed25519_dalek::PublicKey);
impl Hash for RemoteIdentity {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.as_bytes().hash(state);
}
}
impl std::fmt::Debug for RemoteIdentity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("RemoteIdentity")
.field(&hex::encode(self.0.as_bytes()))
.finish()
}
}
impl RemoteIdentity {
pub fn from_bytes(bytes: &[u8]) -> Result<Self, IdentityErr> {
Ok(Self(ed25519_dalek::PublicKey::from_bytes(bytes)?))

View file

@ -4,17 +4,30 @@ use std::{
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use crate::spacetime::UnicastStream;
#[derive(Debug)]
pub struct Tunnel {
stream: UnicastStream,
}
impl Tunnel {
// TODO: Proper errors
pub async fn from_stream(mut stream: UnicastStream) -> Result<Self, &'static str> {
pub async fn initiator(mut stream: UnicastStream) -> Result<Self, &'static str> {
stream
.write_all(&[b'T'])
.await
.map_err(|_| "Error writing discriminator")?;
// TODO: Do pairing + authentication
Ok(Self { stream })
}
// TODO: Proper errors
pub async fn responder(mut stream: UnicastStream) -> Result<Self, &'static str> {
let discriminator = stream
.read_u8()
.await
@ -23,7 +36,7 @@ impl Tunnel {
return Err("Invalid discriminator. Is this stream actually a tunnel?");
}
// TODO: Do pairing
// TODO: Do pairing + authentication
Ok(Self { stream })
}

View file

@ -9,11 +9,15 @@ pub struct PeerId(
pub(crate) libp2p::PeerId,
);
// impl PeerId {
// pub fn to_string(&self) -> String {
// self.0.to_string()
// }
// }
impl PeerId {
// pub fn to_string(&self) -> String {
// self.0.to_string()
// }
pub fn random() -> Self {
Self(libp2p::PeerId::random())
}
}
impl FromStr for PeerId {
#[allow(deprecated)]

View file

@ -22,7 +22,7 @@ impl std::fmt::Display for OperationKind<'_> {
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Debug, Type)]
pub struct RelationOperation {
pub relation_item: Value,
pub relation_group: Value,
@ -36,7 +36,7 @@ impl RelationOperation {
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Debug, Type)]
pub enum RelationOperationData {
#[serde(rename = "c")]
Create,
@ -56,7 +56,7 @@ impl RelationOperationData {
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Debug, Type)]
pub struct SharedOperation {
pub record_id: Value,
pub model: String,
@ -69,7 +69,7 @@ impl SharedOperation {
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Debug, Type)]
pub enum SharedOperationData {
#[serde(rename = "c")]
Create,
@ -112,7 +112,7 @@ impl SharedOperationData {
// pub items: Vec<OwnedOperationItem>,
// }
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Debug, Type)]
#[serde(untagged)]
pub enum CRDTOperationType {
Shared(SharedOperation),
@ -120,7 +120,7 @@ pub enum CRDTOperationType {
// Owned(OwnedOperation),
}
#[derive(Serialize, Deserialize, Clone, Type)]
#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Type)]
pub struct CRDTOperation {
pub instance: Uuid,
#[specta(type = u32)]

View file

@ -83,7 +83,7 @@ export type Procedures = {
{ key: "notifications.listen", input: never, result: Notification } |
{ key: "p2p.events", input: never, result: P2PEvent } |
{ key: "p2p.spacedropProgress", input: string, result: number } |
{ key: "sync.newMessage", input: LibraryArgs<null>, result: CRDTOperation }
{ key: "sync.newMessage", input: LibraryArgs<null>, result: null }
};
export type BuildInfo = { version: string; commit: string }

View file

@ -52,7 +52,13 @@ export const ClientContextProvider = ({
currentLibraryCache.id = currentLibraryId;
return (
<ClientContext.Provider value={{ currentLibraryId, libraries, library }}>
<ClientContext.Provider
value={{
currentLibraryId,
libraries,
library
}}
>
{children}
</ClientContext.Provider>
);

View file

@ -1,4 +1,4 @@
import { PropsWithChildren, createContext, useContext, useState } from 'react';
import { PropsWithChildren, createContext, useContext } from 'react';
import { LibraryConfigWrapped } from '../core';
import { useBridgeSubscription } from '../rspc';
import { ClientContext, useClientContext } from './useClientContext';

View file

@ -5876,7 +5876,7 @@ packages:
magic-string: 0.27.0
react-docgen-typescript: 2.2.2(typescript@5.0.4)
typescript: 5.0.4
vite: 4.3.9(less@4.1.3)
vite: 4.3.9(@types/node@18.15.1)
/@jridgewell/gen-mapping@0.3.3:
resolution: {integrity: sha512-HLhSWOLRi875zjjMG/r+Nv0oCW8umGb0BgEhyX3dDX3egwZtB8PqLnjz3yedt8R5StBrzcg4aBpnh8UA9D1BoQ==}
@ -24272,7 +24272,6 @@ packages:
rollup: 3.24.1
optionalDependencies:
fsevents: 2.3.2
dev: true
/vite@4.3.9(less@4.1.3):
resolution: {integrity: sha512-qsTNZjO9NoJNW7KnOrgYwczm0WctJ8m/yqYAMAK9Lxt4SoySUfS5S8ia9K7JHpa3KEeMfyF8LoJ3c5NeBJy6pg==}