[ENG-1488] Cloud sync message compression (#1922)

* CRDTOperationWithoutInstance

* almost there

* fully compress messages

* implement more of sd-cloud-api

* sd-cloud-api-ify

* landing -_-

* openssl

* clippy

* idk

* bruh

* wut

* ahhh right
This commit is contained in:
Brendan Allan 2024-01-09 18:21:23 +08:00 committed by GitHub
parent b94d5207a9
commit 4962b1160b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 795 additions and 355 deletions

View file

@ -182,14 +182,7 @@ async fn main() -> tauri::Result<()> {
let (_guard, result) = match Node::init_logger(&data_dir) {
Ok(guard) => (
Some(guard),
Node::new(
data_dir,
sd_core::Env {
api_url: "https://app.spacedrive.com".to_string(),
client_id: CLIENT_ID.to_string(),
},
)
.await,
Node::new(data_dir, sd_core::Env::new(CLIENT_ID)).await,
),
Err(err) => (None, Err(NodeError::Logger(err))),
};
@ -446,7 +439,8 @@ fn mouse_position(window: &Window) -> (f64, f64) {
fn difference(a: f64, b: f64) -> f64 {
let x = a - b;
if x < 0.0 {
return x * -1.0;
x * -1.0
} else {
x
}
return x;
}

View file

@ -1,5 +1,4 @@
use std::{
io,
net::Ipv4Addr,
pin::Pin,
sync::Arc,

View file

@ -46,10 +46,14 @@ export async function Sidebar() {
{navigationWithReleases.map((section) => {
const Icon = iconConfig[section.slug];
const href = section.categories[0]?.docs[0]?.url;
if (!href) return null;
return (
<SectionLink
// Use the first page in the section as the link
href={section.categories[0].docs[0].url}
href={href}
key={section.slug}
slug={section.slug}
>

View file

@ -56,7 +56,10 @@ export default async function Page() {
</span>
</p>
<Downloads
latestVersion={[toTitleCase(frontmatter.category), `v${release.tag_name}`]
latestVersion={[
frontmatter.category && toTitleCase(frontmatter.category),
`v${release.tag_name}`
]
.filter(Boolean)
.join(' ')}
/>

View file

@ -75,15 +75,9 @@ pub fn handle_core_msg(
let _guard = Node::init_logger(&data_dir);
// TODO: probably don't unwrap
let new_node = Node::new(
data_dir,
sd_core::Env {
api_url: "https://app.spacedrive.com".to_string(),
client_id: CLIENT_ID.to_string(),
},
)
.await
.unwrap();
let new_node = Node::new(data_dir, sd_core::Env::new(CLIENT_ID))
.await
.unwrap();
node.replace(new_node.clone());
new_node
}

View file

@ -40,8 +40,10 @@ async fn main() {
let (node, router) = match Node::new(
data_dir,
sd_core::Env {
api_url: std::env::var("SD_API_URL")
.unwrap_or_else(|_| "https://app.spacedrive.com".to_string()),
api_url: tokio::sync::Mutex::new(
std::env::var("SD_API_URL")
.unwrap_or_else(|_| "https://app.spacedrive.com".to_string()),
),
client_id: std::env::var("SD_CLIENT_ID")
.unwrap_or_else(|_| "04701823-a498-406e-aef9-22081c1dae34".to_string()),
},

View file

@ -136,35 +136,13 @@ impl Actor {
if !is_old {
self.apply_op(op).await.ok();
}
// self.db
// ._transaction()
// .run({
// let timestamps = self.timestamps.clone();
// |db| async move {
// match db
// .instance()
// .update(
// instance::pub_id::equals(uuid_to_bytes(op_instance)),
// vec![instance::timestamp::set(Some(timestamp.as_u64() as i64))],
// )
// .exec()
// .await
// {
// Ok(_) => {
self.timestamps.write().await.insert(op_instance, timestamp);
// Ok(())
// }
// Err(e) => Err(e),
// }
// }
// })
// .await
// .unwrap();
self.timestamps.write().await.insert(op_instance, timestamp);
}
}
async fn apply_op(&mut self, op: CRDTOperation) -> prisma_client_rust::Result<()> {
// TODO: Needs to be transaction-ified
ModelSyncData::from_op(op.clone())
.unwrap()
.exec(&self.db)

View file

@ -208,7 +208,6 @@ async fn bruh() -> Result<(), Box<dyn std::error::Error>> {
.await?;
assert_eq!(out.len(), 3);
assert!(matches!(out[0].typ, CRDTOperationType::Shared(_)));
instance1.teardown().await;
instance2.teardown().await;

View file

@ -1,23 +1,12 @@
use crate::util::http::ensure_response;
use std::time::Duration;
use reqwest::{Response, StatusCode};
use reqwest::StatusCode;
use rspc::alpha::AlphaRouter;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde::{Deserialize, Serialize};
use specta::Type;
use super::{Ctx, R};
async fn parse_json_body<T: DeserializeOwned>(response: Response) -> Result<T, rspc::Error> {
response.json().await.map_err(|_| {
rspc::Error::new(
rspc::ErrorCode::InternalServerError,
"JSON conversion failed".to_string(),
)
})
}
pub(crate) fn mount() -> AlphaRouter<Ctx> {
R.router()
.procedure("loginSession", {
@ -30,30 +19,42 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
verification_url_complete: String,
},
Complete,
Error,
Error(String),
}
R.subscription(|node, _: ()| async move {
async_stream::stream! {
#[derive(Deserialize, Type)]
struct DeviceAuthorizationResponse {
device_code: String,
user_code: String,
verification_url: String,
verification_uri_complete: String,
}
#[derive(Deserialize, Type)]
struct DeviceAuthorizationResponse {
device_code: String,
user_code: String,
verification_url: String,
verification_uri_complete: String,
}
let Ok(auth_response) = (match node.http
.post(&format!("{}/login/device/code", &node.env.api_url))
async_stream::stream! {
let auth_response = match match node
.http
.post(&format!(
"{}/login/device/code",
&node.env.api_url.lock().await
))
.form(&[("client_id", &node.env.client_id)])
.send()
.await {
Ok(r) => r.json::<DeviceAuthorizationResponse>().await,
Err(e) => Err(e)
}) else {
yield Response::Error;
return;
};
.await
.map_err(|e| e.to_string())
{
Ok(r) => r.json::<DeviceAuthorizationResponse>().await.map_err(|e| e.to_string()),
Err(e) => {
yield Response::Error(e.to_string());
return
},
} {
Ok(v) => v,
Err(e) => {
yield Response::Error(e.to_string());
return
},
};
yield Response::Start {
user_code: auth_response.user_code.clone(),
@ -64,28 +65,30 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
yield loop {
tokio::time::sleep(Duration::from_secs(5)).await;
let Ok(token_resp) = node.http
.post(&format!("{}/login/oauth/access_token", &node.env.api_url))
let token_resp = match node.http
.post(&format!("{}/login/oauth/access_token", &node.env.api_url.lock().await))
.form(&[
("grant_type", sd_cloud_api::auth::DEVICE_CODE_URN),
("device_code", &auth_response.device_code),
("client_id", &node.env.client_id)
])
.send()
.await else {
break Response::Error;
.await {
Ok(v) => v,
Err(e) => break Response::Error(e.to_string())
};
match token_resp.status() {
StatusCode::OK => {
let Ok(token) = token_resp.json().await else {
break Response::Error;
let token = match token_resp.json().await {
Ok(v) => v,
Err(e) => break Response::Error(e.to_string())
};
if node.config
if let Err(e) = node.config
.write(|c| c.auth_token = Some(token))
.await.is_err() {
break Response::Error;
.await {
break Response::Error(e.to_string());
};
@ -97,19 +100,20 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
error: String
}
let Ok(resp) = token_resp.json::<OAuth400>().await else {
break Response::Error;
let resp = match token_resp.json::<OAuth400>().await {
Ok(v) => v,
Err(e) => break Response::Error(e.to_string())
};
match resp.error.as_str() {
"authorization_pending" => continue,
_ => {
break Response::Error;
e => {
break Response::Error(e.to_string())
}
}
},
_ => {
break Response::Error;
s => {
break Response::Error(s.to_string());
}
}
}
@ -133,21 +137,9 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
)
.procedure("me", {
R.query(|node, _: ()| async move {
#[derive(Serialize, Deserialize, Type)]
#[specta(inline)]
struct Response {
id: String,
email: String,
}
let resp = sd_cloud_api::user::me(node.cloud_api_config().await).await?;
node.authed_api_request(
node.http
.get(&format!("{}/api/v1/user/me", &node.env.api_url)),
)
.await
.and_then(ensure_response)
.map(parse_json_body::<Response>)?
.await
Ok(resp)
})
})
}

View file

@ -22,10 +22,22 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
R.router()
.merge("library.", library::mount())
.merge("locations.", locations::mount())
.procedure("getApiOrigin", {
R.query(|node, _: ()| async move { Ok(node.env.api_url.lock().await.to_string()) })
})
.procedure("setApiOrigin", {
R.mutation(|node, origin: String| async move {
let mut origin_env = node.env.api_url.lock().await;
*origin_env = origin;
node.config.write(|c| c.auth_token = None).await.ok();
Ok(())
})
})
}
mod library {
use super::*;
pub fn mount() -> AlphaRouter<Ctx> {
@ -116,11 +128,8 @@ mod locations {
};
use http_body::Full;
use serde::{Deserialize, Serialize};
use serde_json::json;
use specta::Type;
use crate::util::http::ensure_response;
use super::*;
#[derive(Type, Serialize, Deserialize)]
@ -129,58 +138,27 @@ mod locations {
name: String,
}
#[derive(Debug, Clone, Type, Deserialize)]
pub struct AuthoriseResponse {
access_key_id: String,
secret_access_key: String,
session_token: String,
}
pub fn mount() -> AlphaRouter<Ctx> {
R.router()
.procedure("list", {
R.query(|node, _: ()| async move {
let api_url = &node.env.api_url;
node.authed_api_request(node.http.get(&format!("{api_url}/api/v1/locations")))
.await
.and_then(ensure_response)
.map(parse_json_body::<Vec<CloudLocation>>)?
sd_cloud_api::locations::list(node.cloud_api_config().await)
.await
.map_err(Into::into)
})
})
.procedure("create", {
R.mutation(|node, name: String| async move {
let api_url = &node.env.api_url;
node.authed_api_request(
node.http
.post(&format!("{api_url}/api/v1/locations"))
.json(&json!({
"name": name
})),
)
.await
.and_then(ensure_response)
.map(parse_json_body::<CloudLocation>)?
.await
sd_cloud_api::locations::create(node.cloud_api_config().await, name)
.await
.map_err(Into::into)
})
})
.procedure("remove", {
R.mutation(|node, id: String| async move {
let api_url = &node.env.api_url;
node.authed_api_request(
node.http
.post(&format!("{api_url}/api/v1/locations/delete"))
.json(&json!({
"id": id
})),
)
.await
.and_then(ensure_response)?;
Ok(())
sd_cloud_api::locations::create(node.cloud_api_config().await, id)
.await
.map_err(Into::into)
})
})
// TODO: Remove this
@ -190,7 +168,7 @@ mod locations {
// Lazy::new(|| Mutex::new(None));
#[derive(Debug)]
pub struct CredentialsProvider(AuthoriseResponse);
pub struct CredentialsProvider(sd_cloud_api::locations::authorise::Response);
impl ProvideCredentials for CredentialsProvider {
fn provide_credentials<'a>(&'a self) -> future::ProvideCredentials<'a>
@ -221,19 +199,11 @@ mod locations {
let token = {
let token = &mut None; // AUTH_TOKEN.lock().await; // TODO: Caching of the token. For now it's annoying when debugging.
if token.is_none() {
let api_url = &node.env.api_url;
*token = Some(
node.authed_api_request(
node.http
.post(&format!("{api_url}/api/v1/locations/authorise"))
.json(&json!({
"id": params.id
})),
sd_cloud_api::locations::authorise(
node.cloud_api_config().await,
params.id,
)
.await
.and_then(ensure_response)
.map(parse_json_body::<AuthoriseResponse>)?
.await?,
);
}

View file

@ -210,7 +210,7 @@ pub fn mount() -> AlphaRouter<Ctx> {
}| async move {
let Library { db, .. } = library.as_ref();
let mut query = db.file_path().find_many({
let params = {
let mut params = Vec::new();
for filter in filters {
@ -218,7 +218,9 @@ pub fn mount() -> AlphaRouter<Ctx> {
}
params
});
};
let mut query = db.file_path().find_many(params);
if let Some(take) = take {
query = query.take(take as i64);

View file

@ -1,5 +1,3 @@
use crate::util::http::ensure_response;
use rspc::alpha::AlphaRouter;
use serde::{Deserialize, Serialize};
use specta::Type;
@ -17,21 +15,12 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
}
|node, args: Feedback| async move {
node.add_auth_header(
node.http
.post(&format!("{}/api/v1/feedback", &node.env.api_url)),
sd_cloud_api::feedback::send(
node.cloud_api_config().await,
args.message,
args.emoji,
)
.await
.json(&args)
.send()
.await
.map_err(|_| {
rspc::Error::new(
rspc::ErrorCode::InternalServerError,
"Request failed".to_string(),
)
})
.and_then(ensure_response)?;
.await?;
Ok(())
}

View file

@ -3,6 +3,7 @@ use crate::cloud::sync::err_return;
use std::sync::Arc;
use tokio::sync::Notify;
use tracing::info;
use super::Library;
@ -39,6 +40,8 @@ pub async fn run_actor((library, notify): (Arc<Library>, Arc<Notify>)) {
.await
);
info!("Got {} cloud ops to ingest", ops.len());
err_return!(
sync.ingest
.event_tx

View file

@ -1,4 +1,8 @@
use crate::{library::Library, Node};
use sd_sync::*;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
use std::sync::{atomic, Arc};
@ -63,3 +67,108 @@ macro_rules! err_return {
pub(crate) use err_return;
use tokio::sync::Notify;
pub type CompressedCRDTOperationsForModel = Vec<(Value, Vec<CompressedCRDTOperation>)>;
#[derive(Serialize, Deserialize)]
pub struct CompressedCRDTOperations(Vec<(Uuid, Vec<(String, CompressedCRDTOperationsForModel)>)>);
impl CompressedCRDTOperations {
pub fn new(ops: Vec<CRDTOperation>) -> Self {
let mut compressed = vec![];
let mut ops_iter = ops.into_iter();
let Some(first) = ops_iter.next() else {
return Self(vec![]);
};
let mut instance_id = first.instance;
let mut instance = vec![];
let mut model_str = first.model.clone();
let mut model = vec![];
let mut record_id = first.record_id.clone();
let mut record = vec![first.into()];
for op in ops_iter {
if instance_id != op.instance {
model.push((
std::mem::replace(&mut record_id, op.record_id.clone()),
std::mem::take(&mut record),
));
instance.push((
std::mem::replace(&mut model_str, op.model.clone()),
std::mem::take(&mut model),
));
compressed.push((
std::mem::replace(&mut instance_id, op.instance),
std::mem::take(&mut instance),
));
} else if model_str != op.model {
model.push((
std::mem::replace(&mut record_id, op.record_id.clone()),
std::mem::take(&mut record),
));
instance.push((
std::mem::replace(&mut model_str, op.model.clone()),
std::mem::take(&mut model),
));
} else if record_id != op.record_id {
model.push((
std::mem::replace(&mut record_id, op.record_id.clone()),
std::mem::take(&mut record),
));
}
record.push(CompressedCRDTOperation::from(op))
}
model.push((record_id, record));
instance.push((model_str, model));
compressed.push((instance_id, instance));
Self(compressed)
}
pub fn into_ops(self) -> Vec<CRDTOperation> {
let mut ops = vec![];
for (instance_id, instance) in self.0 {
for (model_str, model) in instance {
for (record_id, record) in model {
for op in record {
ops.push(CRDTOperation {
instance: instance_id,
model: model_str.clone(),
record_id: record_id.clone(),
timestamp: op.timestamp,
id: op.id,
data: op.data,
})
}
}
}
}
ops
}
}
#[derive(PartialEq, Eq, Serialize, Deserialize, Clone)]
pub struct CompressedCRDTOperation {
pub timestamp: NTP64,
pub id: Uuid,
pub data: CRDTOperationData,
}
impl From<CRDTOperation> for CompressedCRDTOperation {
fn from(value: CRDTOperation) -> Self {
Self {
timestamp: value.timestamp,
id: value.id,
data: value.data,
}
}
}

View file

@ -1,13 +1,14 @@
use crate::{
cloud::sync::{err_break, err_return},
cloud::sync::{err_break, err_return, CompressedCRDTOperations},
library::Library,
Node,
};
use sd_core_sync::NTP64;
use sd_prisma::prisma::{cloud_crdt_operation, instance, PrismaClient, SortOrder};
use sd_sync::*;
use sd_sync::CRDTOperation;
use sd_utils::{from_bytes_to_uuid, uuid_to_bytes};
use tracing::info;
use std::{
collections::{hash_map::Entry, HashMap},
@ -17,14 +18,12 @@ use std::{
use base64::prelude::*;
use chrono::Utc;
use serde::Deserialize;
use serde_json::{json, to_vec};
use serde_json::to_vec;
use tokio::{sync::Notify, time::sleep};
use uuid::Uuid;
pub async fn run_actor((library, node, ingest_notify): (Arc<Library>, Arc<Node>, Arc<Notify>)) {
let db = &library.db;
let api_url = &library.env.api_url;
let library_id = library.id;
let mut cloud_timestamps = {
@ -57,53 +56,49 @@ pub async fn run_actor((library, node, ingest_notify): (Arc<Library>, Arc<Node>,
.collect::<HashMap<_, _>>()
};
info!(
"Fetched timestamps for {} local instances",
cloud_timestamps.len()
);
loop {
let instances = {
err_break!(
db.instance()
.find_many(vec![])
.select(instance::select!({ pub_id }))
.exec()
.await
)
.into_iter()
.map(|i| {
let uuid = from_bytes_to_uuid(&i.pub_id);
json!({
"instanceUuid": uuid,
"fromTime": cloud_timestamps.get(&uuid).cloned().unwrap_or_default().as_u64().to_string()
})
})
.collect::<Vec<_>>()
};
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct MessageCollection {
instance_uuid: Uuid,
// start_time: String,
end_time: String,
contents: String,
}
let instances = err_break!(
db.instance()
.find_many(vec![])
.select(instance::select!({ pub_id }))
.exec()
.await
);
{
let collections = node
.authed_api_request(
node.http
.post(&format!(
"{api_url}/api/v1/libraries/{library_id}/messageCollections/get"
))
.json(&json!({
"instanceUuid": library.instance_uuid,
"timestamps": instances
})),
let collections = {
use sd_cloud_api::library::message_collections;
message_collections::get(
node.cloud_api_config().await,
library_id,
library.instance_uuid,
instances
.into_iter()
.map(|i| {
let uuid = from_bytes_to_uuid(&i.pub_id);
message_collections::get::InstanceTimestamp {
instance_uuid: uuid,
from_time: cloud_timestamps
.get(&uuid)
.cloned()
.unwrap_or_default()
.as_u64()
.to_string(),
}
})
.collect::<Vec<_>>(),
)
.await
.expect("couldn't get response")
.json::<Vec<MessageCollection>>()
.await
.expect("couldn't deserialize response");
};
let collections = err_break!(collections);
info!("Received {} collections", collections.len());
let mut cloud_library_data: Option<Option<sd_cloud_api::Library>> = None;
@ -152,15 +147,12 @@ pub async fn run_actor((library, node, ingest_notify): (Arc<Library>, Arc<Node>,
e.insert(NTP64(0));
}
err_break!(
write_cloud_ops_to_db(
err_break!(serde_json::from_slice(err_break!(
&BASE64_STANDARD.decode(collection.contents)
))),
db
)
.await
);
let compressed_operations: CompressedCRDTOperations =
err_break!(serde_json::from_slice(err_break!(
&BASE64_STANDARD.decode(collection.contents)
)));
err_break!(write_cloud_ops_to_db(compressed_operations.into_ops(), db).await);
let collection_timestamp =
NTP64(collection.end_time.parse().expect("unable to parse time"));
@ -196,7 +188,7 @@ fn crdt_op_db(op: &CRDTOperation) -> cloud_crdt_operation::Create {
id: op.id.as_bytes().to_vec(),
timestamp: op.timestamp.0 as i64,
instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()),
kind: op.kind().to_string(),
kind: op.data.as_kind().to_string(),
data: to_vec(&op.data).expect("unable to serialize data"),
model: op.model.to_string(),
record_id: to_vec(&op.record_id).expect("unable to serialize record id"),

View file

@ -1,4 +1,4 @@
use crate::{cloud::sync::err_break, Node};
use crate::{cloud::sync::CompressedCRDTOperations, Node};
use sd_core_sync::{GetOpsArgs, SyncMessage, NTP64};
use sd_prisma::prisma::instance;
@ -6,16 +6,12 @@ use sd_utils::from_bytes_to_uuid;
use std::{sync::Arc, time::Duration};
use serde::Deserialize;
use serde_json::json;
use tokio::time::sleep;
use uuid::Uuid;
use super::Library;
use super::{err_break, Library};
pub async fn run_actor((library, node): (Arc<Library>, Arc<Node>)) {
let db = &library.db;
let api_url = &library.env.api_url;
let library_id = library.id;
loop {
@ -28,35 +24,22 @@ pub async fn run_actor((library, node): (Arc<Library>, Arc<Node>)) {
.await
)
.into_iter()
.map(|i| json!({ "instanceUuid": from_bytes_to_uuid(&i.pub_id).to_string() }))
.map(|i| from_bytes_to_uuid(&i.pub_id))
.collect::<Vec<_>>();
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct RequestAdd {
instance_uuid: Uuid,
from_time: Option<String>,
// mutex key on the instance
key: String,
}
let req_adds = err_break!(
err_break!(
node.authed_api_request(
node.http
.post(&format!(
"{api_url}/api/v1/libraries/{library_id}/messageCollections/requestAdd"
))
.json(&json!({ "instances": instances })),
)
.await
sd_cloud_api::library::message_collections::request_add(
node.cloud_api_config().await,
library_id,
instances,
)
.json::<Vec<RequestAdd>>()
.await
);
let mut instances = vec![];
use sd_cloud_api::library::message_collections::do_add;
for req_add in req_adds {
let ops = err_break!(
library
@ -84,49 +67,21 @@ pub async fn run_actor((library, node): (Arc<Library>, Arc<Node>)) {
let start_time = ops[0].timestamp.0.to_string();
let end_time = ops[ops.len() - 1].timestamp.0.to_string();
instances.push(json!({
"uuid": req_add.instance_uuid,
"key": req_add.key,
"startTime": start_time,
"endTime": end_time,
"contents": ops,
}))
instances.push(do_add::Input {
uuid: req_add.instance_uuid,
key: req_add.key,
start_time,
end_time,
contents: serde_json::to_value(CompressedCRDTOperations::new(ops))
.expect("CompressedCRDTOperation should serialize!"),
})
}
tracing::debug!("Number of instances: {}", instances.len());
tracing::debug!(
"Number of messages: {}",
instances
.iter()
.map(|i| i["contents"].as_array().expect("no contents found").len())
.sum::<usize>()
);
if instances.is_empty() {
break;
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct DoAdd {
// instance_uuid: Uuid,
// from_time: String,
}
let _responses = err_break!(
err_break!(
node.authed_api_request(
node.http
.post(&format!(
"{api_url}/api/v1/libraries/{library_id}/messageCollections/doAdd",
))
.json(&json!({ "instances": instances })),
)
.await
)
.json::<Vec<DoAdd>>()
.await
);
err_break!(do_add(node.cloud_api_config().await, library_id, instances,).await);
}
{

View file

@ -1,4 +1,15 @@
use tokio::sync::Mutex;
pub struct Env {
pub api_url: String,
pub api_url: Mutex<String>,
pub client_id: String,
}
impl Env {
pub fn new(client_id: &str) -> Self {
Self {
api_url: Mutex::new("https://app.spacedrive.com".to_string()),
client_id: client_id.to_string(),
}
}
}

View file

@ -291,7 +291,7 @@ impl Node {
pub async fn cloud_api_config(&self) -> sd_cloud_api::RequestConfig {
sd_cloud_api::RequestConfig {
client: self.http.clone(),
api_url: self.env.api_url.clone(),
api_url: self.env.api_url.lock().await.clone(),
auth_token: self.config.get().await.auth_token,
}
}

View file

@ -23,9 +23,7 @@ impl Actors {
actor_fn: impl FnOnce() -> F + Send + Sync + Clone + 'static,
autostart: bool,
) {
let mut actors = self.actors.lock().await;
actors.insert(
self.actors.lock().await.insert(
name.to_string(),
Arc::new(Actor {
abort_handle: Default::default(),

View file

@ -362,9 +362,9 @@ impl Entry {
pub async fn get_all_entries(path: PathBuf) -> Result<Vec<Entry>, NonIndexedLocationError> {
tokio::task::spawn_blocking(move || {
let path = &path;
let mut dir = std::fs::read_dir(&path).map_err(|e| (path, e))?;
let dir = std::fs::read_dir(path).map_err(|e| (path, e))?;
let mut entries = Vec::new();
while let Some(entry) = dir.next() {
for entry in dir {
let entry = entry.map_err(|e| (path, e))?;
// We must not keep `entry` around as we will quickly hit the OS limit on open file descriptors

View file

@ -90,6 +90,7 @@ impl OrphanRemoverActor {
.await
{
error!("Failed to remove orphaned objects: {e:#?}");
break;
}
}
}

View file

@ -58,10 +58,10 @@ impl<S: Stream + Unpin> Stream for BatchedStream<S> {
Poll::Pending
} else {
let batch = std::mem::take(batch);
return Poll::Ready(Some(batch));
Poll::Ready(Some(batch))
}
}
BatchedStreamProj::Complete => return Poll::Ready(None),
BatchedStreamProj::Complete => Poll::Ready(None),
}
}
}

View file

@ -1,6 +0,0 @@
use reqwest::Response;
pub fn ensure_response(resp: Response) -> Result<Response, rspc::Error> {
resp.error_for_status()
.map_err(|e| rspc::Error::new(rspc::ErrorCode::InternalServerError, e.to_string()))
}

View file

@ -2,7 +2,6 @@ mod abort_on_drop;
mod batched_stream;
#[cfg(debug_assertions)]
pub mod debug_initializer;
pub mod http;
mod infallible_request;
mod maybe_undefined;
pub mod mpscrr;

View file

@ -41,6 +41,16 @@ pub struct Instance {
pub identity: String,
}
#[derive(Serialize, Deserialize, Debug, Type)]
#[serde(rename_all = "camelCase")]
#[specta(rename = "CloudMessageCollection")]
pub struct MessageCollection {
pub instance_uuid: Uuid,
pub start_time: String,
pub end_time: String,
pub contents: String,
}
trait WithAuth {
fn with_auth(self, token: OAuthToken) -> Self;
}
@ -54,6 +64,69 @@ impl WithAuth for reqwest::RequestBuilder {
}
}
pub mod feedback {
use super::*;
pub use send::exec as send;
pub mod send {
use super::*;
pub async fn exec(config: RequestConfig, message: String, emoji: u8) -> Result<(), Error> {
let mut req = config
.client
.post(format!("{}/api/v1/feedback", config.api_url))
.json(&json!({
"message": message,
"emoji": emoji,
}));
if let Some(auth_token) = config.auth_token {
req = req.with_auth(auth_token);
}
req.send()
.await
.and_then(|r| r.error_for_status())
.map_err(|e| Error(e.to_string()))?;
Ok(())
}
}
}
pub mod user {
use super::*;
pub use me::exec as me;
pub mod me {
use super::*;
#[derive(Serialize, Deserialize, Type)]
#[specta(inline)]
pub struct Response {
id: String,
email: String,
}
pub async fn exec(config: RequestConfig) -> Result<Response, Error> {
let Some(auth_token) = config.auth_token else {
return Err(Error("Authentication required".to_string()));
};
config
.client
.get(&format!("{}/api/v1/user/me", config.api_url))
.with_auth(auth_token)
.send()
.await
.map_err(|e| Error(e.to_string()))?
.json()
.await
.map_err(|e| Error(e.to_string()))
}
}
}
pub mod library {
use super::*;
@ -182,4 +255,255 @@ pub mod library {
Ok(())
}
}
pub mod message_collections {
use super::*;
pub use get::exec as get;
pub mod get {
use super::*;
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct InstanceTimestamp {
pub instance_uuid: Uuid,
pub from_time: String,
}
pub async fn exec(
config: RequestConfig,
library_id: Uuid,
this_instance_uuid: Uuid,
timestamps: Vec<InstanceTimestamp>,
) -> Result<Response, Error> {
let Some(auth_token) = config.auth_token else {
return Err(Error("Authentication required".to_string()));
};
config
.client
.post(&format!(
"{}/api/v1/libraries/{}/messageCollections/get",
config.api_url, library_id
))
.json(&json!({
"instanceUuid": this_instance_uuid,
"timestamps": timestamps
}))
.with_auth(auth_token)
.send()
.await
.map_err(|e| Error(e.to_string()))?
.json()
.await
.map_err(|e| Error(e.to_string()))
}
pub type Response = Vec<MessageCollection>;
}
pub use request_add::exec as request_add;
pub mod request_add {
use super::*;
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RequestAdd {
pub instance_uuid: Uuid,
pub from_time: Option<String>,
// mutex key on the instance
pub key: String,
}
pub async fn exec(
config: RequestConfig,
library_id: Uuid,
instances: Vec<Uuid>,
) -> Result<Response, Error> {
let Some(auth_token) = config.auth_token else {
return Err(Error("Authentication required".to_string()));
};
let instances = instances
.into_iter()
.map(|i| json!({"instanceUuid": i }))
.collect::<Vec<_>>();
config
.client
.post(&format!(
"{}/api/v1/libraries/{}/messageCollections/requestAdd",
config.api_url, library_id
))
.json(&json!({ "instances": instances }))
.with_auth(auth_token)
.send()
.await
.map_err(|e| Error(e.to_string()))?
.json()
.await
.map_err(|e| Error(e.to_string()))
}
pub type Response = Vec<RequestAdd>;
}
pub use do_add::exec as do_add;
pub mod do_add {
use super::*;
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Input {
pub uuid: Uuid,
pub key: String,
pub start_time: String,
pub end_time: String,
pub contents: serde_json::Value,
}
pub async fn exec(
config: RequestConfig,
library_id: Uuid,
instances: Vec<Input>,
) -> Result<(), Error> {
let Some(auth_token) = config.auth_token else {
return Err(Error("Authentication required".to_string()));
};
config
.client
.post(&format!(
"{}/api/v1/libraries/{}/messageCollections/requestAdd",
config.api_url, library_id
))
.json(&json!({ "instances": instances }))
.with_auth(auth_token)
.send()
.await
.and_then(|r| r.error_for_status())
.map_err(|e| Error(e.to_string()))?;
Ok(())
}
}
}
}
#[derive(Type, Serialize, Deserialize)]
pub struct CloudLocation {
id: String,
name: String,
}
pub mod locations {
use super::*;
pub use list::exec as list;
pub mod list {
use super::*;
pub async fn exec(config: RequestConfig) -> Result<Response, Error> {
let Some(auth_token) = config.auth_token else {
return Err(Error("Authentication required".to_string()));
};
config
.client
.get(&format!("{}/api/v1/locations", config.api_url))
.with_auth(auth_token)
.send()
.await
.map_err(|e| Error(e.to_string()))?
.json()
.await
.map_err(|e| Error(e.to_string()))
}
pub type Response = Vec<CloudLocation>;
}
pub use create::exec as create;
pub mod create {
use super::*;
pub async fn exec(config: RequestConfig, name: String) -> Result<Response, Error> {
let Some(auth_token) = config.auth_token else {
return Err(Error("Authentication required".to_string()));
};
config
.client
.post(&format!("{}/api/v1/locations", config.api_url))
.json(&json!({
"name": name,
}))
.with_auth(auth_token)
.send()
.await
.map_err(|e| Error(e.to_string()))?
.json()
.await
.map_err(|e| Error(e.to_string()))
}
pub type Response = CloudLocation;
}
pub use remove::exec as remove;
pub mod remove {
use super::*;
pub async fn exec(config: RequestConfig, id: String) -> Result<Response, Error> {
let Some(auth_token) = config.auth_token else {
return Err(Error("Authentication required".to_string()));
};
config
.client
.post(&format!("{}/api/v1/locations/delete", config.api_url))
.json(&json!({
"id": id,
}))
.with_auth(auth_token)
.send()
.await
.map_err(|e| Error(e.to_string()))?
.json()
.await
.map_err(|e| Error(e.to_string()))
}
pub type Response = CloudLocation;
}
pub use authorise::exec as authorise;
pub mod authorise {
use super::*;
pub async fn exec(config: RequestConfig, id: String) -> Result<Response, Error> {
let Some(auth_token) = config.auth_token else {
return Err(Error("Authentication required".to_string()));
};
config
.client
.post(&format!("{}/api/v1/locations/authorise", config.api_url))
.json(&json!({ "id": id }))
.with_auth(auth_token)
.send()
.await
.map_err(|e| Error(e.to_string()))?
.json()
.await
.map_err(|e| Error(e.to_string()))
}
#[derive(Debug, Clone, Type, Deserialize)]
pub struct Response {
pub access_key_id: String,
pub secret_access_key: String,
pub session_token: String,
}
}
}

View file

@ -0,0 +1,84 @@
#[derive(Serialize, Deserialize)]
pub struct CompressedCRDTOperations(
Vec<(
Uuid,
Vec<(String, Vec<(Value, Vec<CompressedCRDTOperation>)>)>,
)>,
);
impl CompressedCRDTOperations {
pub fn new(ops: Vec<CRDTOperation>) -> Self {
let mut compressed = vec![];
let mut ops_iter = ops.into_iter();
let Some(first) = ops_iter.next() else {
return Self(vec![]);
};
let mut instance_id = first.instance;
let mut instance = vec![];
let mut model_str = first.model.clone();
let mut model = vec![];
let mut record_id = first.record_id.clone();
let mut record = vec![first.into()];
for op in ops_iter {
if instance_id != op.instance {
model.push((
std::mem::replace(&mut record_id, op.record_id),
std::mem::take(&mut record),
));
instance.push((
std::mem::replace(&mut model_str, op.model),
std::mem::take(&mut model),
));
compressed.push((
std::mem::replace(&mut instance_id, op.instance),
std::mem::take(&mut instance),
));
} else if model_str != op.model {
model.push((
std::mem::replace(&mut record_id, op.record_id),
std::mem::take(&mut record),
));
instance.push((
std::mem::replace(&mut model_str, op.model),
std::mem::take(&mut model),
));
} else if record_id != op.record_id {
model.push((
std::mem::replace(&mut record_id, op.record_id),
std::mem::take(&mut record),
));
}
record.push(CompressedCRDTOperation::from(op))
}
model.push((record_id, record));
instance.push((model_str, model));
compressed.push((instance_id, instance));
Self(compressed)
}
}
#[derive(PartialEq, Eq, Serialize, Deserialize, Clone)]
pub struct CompressedCRDTOperation {
pub timestamp: NTP64,
pub id: Uuid,
pub data: CRDTOperationData,
}
impl From<CRDTOperation> for CompressedCRDTOperation {
fn from(value: CRDTOperation) -> Self {
Self {
timestamp: value.timestamp,
id: value.id,
data: value.data,
}
}
}

View file

@ -33,7 +33,7 @@ pub enum CRDTOperationData {
}
impl CRDTOperationData {
fn as_kind(&self) -> OperationKind {
pub fn as_kind(&self) -> OperationKind {
match self {
Self::Create => OperationKind::Create,
Self::Update { field, .. } => OperationKind::Update(field),

View file

@ -1,9 +1,12 @@
import { CheckSquare } from '@phosphor-icons/react';
import { useQueryClient } from '@tanstack/react-query';
import { useNavigate } from 'react-router';
import {
auth,
backendFeatures,
features,
toggleFeatureFlag,
useBridgeMutation,
useBridgeQuery,
useDebugState,
useFeatureFlags,
@ -43,6 +46,10 @@ export default () => {
}
>
<div className="no-scrollbar block h-96 w-[430px] overflow-y-scroll pb-4">
<Setting mini title="Cloud Origin" description="Change the cloud origin to use">
<CloudOriginSelect />
</Setting>
<Setting
mini
title="rspc Logger"
@ -232,3 +239,31 @@ function FeatureFlagSelector() {
// </Setting>
// );
// }
function CloudOriginSelect() {
const origin = useBridgeQuery(['cloud.getApiOrigin']);
const setOrigin = useBridgeMutation(['cloud.setApiOrigin']);
const queryClient = useQueryClient();
return (
<>
{origin.data && (
<Select
onChange={(v) =>
setOrigin.mutateAsync(v).then(() => {
auth.logout();
queryClient.invalidateQueries();
})
}
value={origin.data}
>
<SelectOption value="https://app.spacedrive.com">
https://app.spacedrive.com
</SelectOption>
<SelectOption value="http://localhost:3000">http://localhost:3000</SelectOption>
</Select>
)}
</>
);
}

View file

@ -13,6 +13,7 @@ import {
} from '@sd/client';
import { useRootContext } from '~/app/RootContext';
import { LibraryIdParamsSchema } from '~/app/route-schemas';
import ErrorFallback, { BetterErrorBoundary } from '~/ErrorFallback';
import {
useKeybindEventHandler,
useOperatingSystem,
@ -79,22 +80,24 @@ const Layout = () => {
showControls.transparentBg ? 'bg-app/80' : 'bg-app'
)}
>
{library ? (
<QuickPreviewContextProvider>
<LibraryContextProvider library={library}>
<Suspense
fallback={<div className="h-screen w-screen bg-app" />}
>
<Outlet />
<DragOverlay />
</Suspense>
</LibraryContextProvider>
</QuickPreviewContextProvider>
) : (
<h1 className="p-4 text-white">
Please select or create a library in the sidebar.
</h1>
)}
<BetterErrorBoundary FallbackComponent={ErrorFallback}>
{library ? (
<QuickPreviewContextProvider>
<LibraryContextProvider library={library}>
<Suspense
fallback={<div className="h-screen w-screen bg-app" />}
>
<Outlet />
<DragOverlay />
</Suspense>
</LibraryContextProvider>
</QuickPreviewContextProvider>
) : (
<h1 className="p-4 text-white">
Please select or create a library in the sidebar.
</h1>
)}
</BetterErrorBoundary>
</div>
</DndContext>
</div>

View file

@ -9,15 +9,17 @@ export const Component = () => {
const authState = auth.useStateSnapshot();
if (authState.status === 'loggedIn') return <Authenticated />;
if (authState.status === 'notLoggedIn' || authState.status === 'loggingIn')
return (
<div className="flex flex-row p-4">
<LoginButton />
</div>
);
const authSensitiveChild = () => {
if (authState.status === 'loggedIn') return <Authenticated />;
if (authState.status === 'notLoggedIn' || authState.status === 'loggingIn')
return <LoginButton />;
return null;
return null;
};
return (
<div className="flex h-full w-full flex-col items-start p-4">{authSensitiveChild()}</div>
);
};
function Authenticated() {
@ -32,7 +34,7 @@ function Authenticated() {
);
return (
<div className="flex flex-row p-4">
<>
{cloudLibrary.data ? (
<div className="flex flex-col items-start space-y-2">
<div>
@ -77,6 +79,6 @@ function Authenticated() {
</Button>
</div>
)}
</div>
</>
);
}

View file

@ -49,7 +49,7 @@ export default (props: UseDialogProps) => {
event: { type: 'libraryCreate' }
});
platform.refreshMenuBar && platform.refreshMenuBar();
platform.refreshMenuBar?.();
navigate(`/${library.uuid}`);
} catch (e) {

View file

@ -6,6 +6,7 @@ export type Procedures = {
{ key: "auth.me", input: never, result: { id: string; email: string } } |
{ key: "backups.getAll", input: never, result: GetAll } |
{ key: "buildInfo", input: never, result: BuildInfo } |
{ key: "cloud.getApiOrigin", input: never, result: string } |
{ key: "cloud.library.get", input: LibraryArgs<null>, result: { uuid: string; name: string; instances: CloudInstance[]; ownerId: string } | null } |
{ key: "cloud.library.list", input: never, result: CloudLibrary[] } |
{ key: "cloud.locations.list", input: never, result: CloudLocation[] } |
@ -58,8 +59,9 @@ export type Procedures = {
{ key: "cloud.library.create", input: LibraryArgs<null>, result: null } |
{ key: "cloud.library.join", input: string, result: LibraryConfigWrapped } |
{ key: "cloud.locations.create", input: string, result: CloudLocation } |
{ key: "cloud.locations.remove", input: string, result: null } |
{ key: "cloud.locations.remove", input: string, result: CloudLocation } |
{ key: "cloud.locations.testing", input: TestingParams, result: null } |
{ key: "cloud.setApiOrigin", input: string, result: null } |
{ key: "ephemeralFiles.copyFiles", input: LibraryArgs<EphemeralFileSystemOps>, result: null } |
{ key: "ephemeralFiles.createFolder", input: LibraryArgs<CreateEphemeralFolderArgs>, result: string } |
{ key: "ephemeralFiles.cutFiles", input: LibraryArgs<EphemeralFileSystemOps>, result: null } |
@ -529,7 +531,7 @@ export type RescanArgs = { location_id: number; sub_path: string }
export type Resolution = { width: number; height: number }
export type Response = { Start: { user_code: string; verification_url: string; verification_url_complete: string } } | "Complete" | "Error"
export type Response = { Start: { user_code: string; verification_url: string; verification_url_complete: string } } | "Complete" | { Error: string }
export type RuleKind = "AcceptFilesByGlob" | "RejectFilesByGlob" | "AcceptIfChildrenDirectoriesArePresent" | "RejectIfChildrenDirectoriesArePresent"

View file

@ -34,11 +34,11 @@ nonLibraryClient
store.state = { status: 'notLoggedIn' };
});
type CallbackStatus = 'success' | 'error' | 'cancel';
type CallbackStatus = 'success' | { error: string } | 'cancel';
const loginCallbacks = new Set<(status: CallbackStatus) => void>();
function onError() {
loginCallbacks.forEach((cb) => cb('error'));
function onError(error: string) {
loginCallbacks.forEach((cb) => cb({ error }));
}
export function login(config: ProviderConfig) {
@ -51,12 +51,14 @@ export function login(config: ProviderConfig) {
if (data === 'Complete') {
config.finish?.(authCleanup);
loginCallbacks.forEach((cb) => cb('success'));
} else if (data === 'Error') onError();
} else if ('Error' in data) onError(data.Error);
else {
authCleanup = config.start(data.Start.verification_url_complete);
}
},
onError
onError(e) {
onError(e.message);
}
});
return new Promise<void>((res, rej) => {
@ -69,7 +71,7 @@ export function login(config: ProviderConfig) {
res();
} else {
store.state = { status: 'notLoggedIn' };
rej();
rej(JSON.stringify(status));
}
};
loginCallbacks.add(cb);