diff --git a/.github/actions/setup-rust/action.yaml b/.github/actions/setup-rust/action.yaml index 04636be62..53b2222b1 100644 --- a/.github/actions/setup-rust/action.yaml +++ b/.github/actions/setup-rust/action.yaml @@ -16,7 +16,7 @@ runs: uses: dtolnay/rust-toolchain@stable with: target: ${{ inputs.target }} - toolchain: '1.73' + toolchain: '1.75' components: clippy, rustfmt - name: Cache Rust Dependencies diff --git a/Cargo.lock b/Cargo.lock index fd86b7c91..d76918699 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -312,7 +312,7 @@ checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -334,7 +334,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -345,7 +345,7 @@ checksum = "fdf6721fb0140e4f897002dd086c06f6c27775df19cfe1fccb21181a48fd2c98" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -1027,7 +1027,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.42", + "syn 2.0.48", "which", ] @@ -1464,7 +1464,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -1819,7 +1819,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13b588ba4ac1a99f7f2964d24b3d896ddc6bf847ee3855dbd4366f058cfcd331" dependencies = [ "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -1829,7 +1829,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30d2b3721e861707777e3195b0158f950ae6dc4a27e4d02ff9f67e3eb3de199e" dependencies = [ "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -1899,7 +1899,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -1923,7 +1923,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -1934,7 +1934,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -2178,7 +2178,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -2340,7 +2340,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -2382,7 +2382,7 @@ checksum = "f95e2801cd355d4a1a3e3953ce6ee5ae9603a5c833455343a8bfe3f44d418246" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -2393,7 +2393,7 @@ checksum = "c2ad8cef1d801a4686bfd8919f0b30eac4c8e48968c437a6405ded4fb5272d2b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -2795,7 +2795,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -3203,7 +3203,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 2.1.0", + "indexmap 2.2.1", "slab", "tokio", "tokio-util", @@ -3700,9 +3700,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.1.0" +version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" +checksum = "433de089bd45971eecf4668ee0ee8f4cec17db4f8bd8f7bc3197a6ce37aa7d9b" dependencies = [ "equivalent", "hashbrown 0.14.3", @@ -5428,7 +5428,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -5797,7 +5797,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -5925,7 +5925,7 @@ dependencies = [ "phf_shared 0.11.2", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -5978,7 +5978,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -6038,7 +6038,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5699cc8a63d1aa2b1ee8e12b9ad70ac790d65788cd36101fa37f87ea46c4cef" dependencies = [ "base64 0.21.5", - "indexmap 2.1.0", + "indexmap 2.2.1", "line-wrap", "quick-xml", "serde", @@ -6122,7 +6122,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ "proc-macro2", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -6320,9 +6320,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.71" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75cb1540fadbd5b8fbccc4dddad2734eba435053f725621c070711a14bb5f4b8" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" dependencies = [ "unicode-ident", ] @@ -6347,7 +6347,7 @@ checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -6605,9 +6605,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.33" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" dependencies = [ "proc-macro2", ] @@ -7389,6 +7389,14 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "sd-actors" +version = "0.1.0" +dependencies = [ + "futures", + "tokio", +] + [[package]] name = "sd-ai" version = "0.1.0" @@ -7494,9 +7502,11 @@ dependencies = [ "rmp-serde", "rmpv", "rspc", + "sd-actors", "sd-ai", "sd-cache", "sd-cloud-api", + "sd-core-cloud-sync", "sd-core-sync", "sd-crypto", "sd-ffmpeg", @@ -7533,6 +7543,25 @@ dependencies = [ "webp", ] +[[package]] +name = "sd-core-cloud-sync" +version = "0.1.0" +dependencies = [ + "base64 0.21.5", + "chrono", + "prisma-client-rust", + "sd-cloud-api", + "sd-core-sync", + "sd-prisma", + "sd-sync", + "sd-utils", + "serde", + "serde_json", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "sd-core-sync" version = "0.0.0" @@ -7930,9 +7959,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.193" +version = "1.0.196" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" +checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" dependencies = [ "serde_derive", ] @@ -7967,22 +7996,22 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.193" +version = "1.0.196" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" +checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] name = "serde_json" -version = "1.0.108" +version = "1.0.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" +checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.1", "itoa 1.0.10", "ryu", "serde", @@ -8006,7 +8035,7 @@ checksum = "3081f5ffbb02284dda55132aa26daecedd7372a42417bbbab6f14ab7d6bb9145" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -8040,7 +8069,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.1.0", + "indexmap 2.2.1", "serde", "serde_json", "serde_with_macros", @@ -8056,7 +8085,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -8326,7 +8355,7 @@ checksum = "18938fc0c7c0e8b7c8f4702e8cb53c218d4442f5b7420dd8b3d9640ca9a46712" dependencies = [ "bigdecimal 0.4.2", "chrono", - "indexmap 2.1.0", + "indexmap 2.2.1", "once_cell", "paste", "serde", @@ -8588,7 +8617,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -8631,9 +8660,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.42" +version = "2.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b7d0a2c048d661a1a59fcd7355baa232f7ed34e0ee4df2eef3c1c1c0d3852d8" +checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" dependencies = [ "proc-macro2", "quote", @@ -9010,7 +9039,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -9138,7 +9167,7 @@ checksum = "01742297787513b79cf8e29d1056ede1313e2420b7b3b15d0a768b4921f549df" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -9259,7 +9288,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -9357,7 +9386,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.1", "serde", "serde_spanned", "toml_datetime", @@ -9370,7 +9399,7 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d34d383cd00a163b4a5b85053df514d45bc330f6de7737edfe0a93311d1eaa03" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.1", "serde", "serde_spanned", "toml_datetime", @@ -9437,7 +9466,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] @@ -10125,7 +10154,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", "wasm-bindgen-shared", ] @@ -10159,7 +10188,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -10932,7 +10961,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.42", + "syn 2.0.48", ] [[package]] diff --git a/core/Cargo.toml b/core/Cargo.toml index 7eb64c957..c10b5b6c0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -22,6 +22,7 @@ ai = ["dep:sd-ai"] # Sub-crates sd-cache = { path = "../crates/cache" } sd-core-sync = { path = "./crates/sync" } +sd-core-cloud-sync = { path = "./crates/cloud-sync" } # sd-cloud-api = { path = "../crates/cloud-api" } sd-crypto = { path = "../crates/crypto", features = [ "rspc", @@ -122,6 +123,7 @@ tar = "0.4.40" aws-sdk-s3 = { version = "1.5.0", features = ["behavior-version-latest"] } aws-config = "1.0.3" aws-credential-types = "1.0.3" +sd-actors = { version = "0.1.0", path = "../crates/actors" } # Override features of transitive dependencies [dependencies.openssl] diff --git a/core/crates/cloud-sync/Cargo.toml b/core/crates/cloud-sync/Cargo.toml new file mode 100644 index 000000000..7339fd1de --- /dev/null +++ b/core/crates/cloud-sync/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "sd-core-cloud-sync" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +base64.workspace = true +chrono.workspace = true +sd-cloud-api = { path = "../../../crates/cloud-api" } +sd-core-sync = { path = "../sync" } +sd-prisma = { path = "../../../crates/prisma" } +sd-sync = { path = "../../../crates/sync" } +sd-utils = { path = "../../../crates/utils" } +serde.workspace = true +serde_json = "1.0.113" +tokio.workspace = true +tracing.workspace = true +uuid.workspace = true +prisma-client-rust = { workspace = true } diff --git a/core/src/cloud/sync/ingest.rs b/core/crates/cloud-sync/src/ingest.rs similarity index 74% rename from core/src/cloud/sync/ingest.rs rename to core/crates/cloud-sync/src/ingest.rs index ab59d2377..6a1265991 100644 --- a/core/src/cloud/sync/ingest.rs +++ b/core/crates/cloud-sync/src/ingest.rs @@ -1,15 +1,11 @@ -use crate::cloud::sync::err_return; +use crate::err_return; use std::sync::Arc; use tokio::sync::Notify; use tracing::info; -use super::Library; - -pub async fn run_actor((library, notify): (Arc, Arc)) { - let Library { sync, .. } = library.as_ref(); - +pub async fn run_actor(sync: Arc, notify: Arc) { loop { { let mut rx = sync.ingest.req_rx.lock().await; @@ -21,11 +17,11 @@ pub async fn run_actor((library, notify): (Arc, Arc)) { .await .is_ok() { - use crate::sync::ingest::*; - while let Some(req) = rx.recv().await { const OPS_PER_REQUEST: u32 = 1000; + use sd_core_sync::*; + let timestamps = match req { Request::FinishedIngesting => break, Request::Messages { timestamps } => timestamps, @@ -33,7 +29,7 @@ pub async fn run_actor((library, notify): (Arc, Arc)) { }; let ops = err_return!( - sync.get_cloud_ops(crate::sync::GetOpsArgs { + sync.get_cloud_ops(GetOpsArgs { clocks: timestamps, count: OPS_PER_REQUEST, }) @@ -46,7 +42,7 @@ pub async fn run_actor((library, notify): (Arc, Arc)) { sync.ingest .event_tx .send(sd_core_sync::Event::Messages(MessagesEvent { - instance_id: library.sync.instance, + instance_id: sync.instance, has_more: ops.len() == 1000, messages: ops, })) diff --git a/core/src/cloud/sync/mod.rs b/core/crates/cloud-sync/src/lib.rs similarity index 78% rename from core/src/cloud/sync/mod.rs rename to core/crates/cloud-sync/src/lib.rs index dac42b6c3..e178d22a6 100644 --- a/core/src/cloud/sync/mod.rs +++ b/core/crates/cloud-sync/src/lib.rs @@ -1,44 +1,11 @@ -use crate::{library::Library, Node}; use sd_sync::*; use serde::{Deserialize, Serialize}; use serde_json::Value; use uuid::Uuid; -use std::sync::{atomic, Arc}; - -mod ingest; -mod receive; -mod send; - -pub async fn declare_actors(library: &Arc, node: &Arc) { - let ingest_notify = Arc::new(Notify::new()); - let actors = &library.actors; - - let autorun = node.cloud_sync_flag.load(atomic::Ordering::Relaxed); - - let args = (library.clone(), node.clone()); - actors - .declare("Cloud Sync Sender", move || send::run_actor(args), autorun) - .await; - - let args = (library.clone(), node.clone(), ingest_notify.clone()); - actors - .declare( - "Cloud Sync Receiver", - move || receive::run_actor(args), - autorun, - ) - .await; - - let args = (library.clone(), ingest_notify); - actors - .declare( - "Cloud Sync Ingest", - move || ingest::run_actor(args), - autorun, - ) - .await; -} +pub mod ingest; +pub mod receive; +pub mod send; macro_rules! err_break { ($e:expr) => { @@ -64,9 +31,7 @@ macro_rules! err_return { } }; } - pub(crate) use err_return; -use tokio::sync::Notify; pub type CompressedCRDTOperationsForModel = Vec<(Value, Vec)>; diff --git a/core/src/cloud/sync/receive.rs b/core/crates/cloud-sync/src/receive.rs similarity index 89% rename from core/src/cloud/sync/receive.rs rename to core/crates/cloud-sync/src/receive.rs index 68c5f2dd6..ea7a494bc 100644 --- a/core/src/cloud/sync/receive.rs +++ b/core/crates/cloud-sync/src/receive.rs @@ -1,9 +1,6 @@ -use crate::{ - cloud::sync::{err_break, err_return, CompressedCRDTOperations}, - library::Library, - Node, -}; +use crate::{err_break, err_return, CompressedCRDTOperations}; +use sd_cloud_api::RequestConfigProvider; use sd_core_sync::NTP64; use sd_prisma::prisma::{cloud_crdt_operation, instance, PrismaClient, SortOrder}; use sd_sync::CRDTOperation; @@ -22,12 +19,16 @@ use serde_json::to_vec; use tokio::{sync::Notify, time::sleep}; use uuid::Uuid; -pub async fn run_actor((library, node, ingest_notify): (Arc, Arc, Arc)) { - let db = &library.db; - let library_id = library.id; - +pub async fn run_actor( + db: Arc, + library_id: Uuid, + instance_uuid: Uuid, + sync: Arc, + cloud_api_config_provider: Arc, + ingest_notify: Arc, +) { let mut cloud_timestamps = { - let timestamps = library.sync.timestamps.read().await; + let timestamps = sync.timestamps.read().await; let batch = timestamps .keys() @@ -74,9 +75,9 @@ pub async fn run_actor((library, node, ingest_notify): (Arc, Arc, let collections = { use sd_cloud_api::library::message_collections; message_collections::get( - node.cloud_api_config().await, + cloud_api_config_provider.cloud_api_config().await, library_id, - library.instance_uuid, + instance_uuid, instances .into_iter() .map(|i| { @@ -108,8 +109,8 @@ pub async fn run_actor((library, node, ingest_notify): (Arc, Arc, None => { let Some(fetched_library) = err_break!( sd_cloud_api::library::get( - node.cloud_api_config().await, - library.id + cloud_api_config_provider.cloud_api_config().await, + library_id ) .await ) else { @@ -137,7 +138,7 @@ pub async fn run_actor((library, node, ingest_notify): (Arc, Arc, err_break!( create_instance( - db, + &db, collection.instance_uuid, err_break!(BASE64_STANDARD.decode(instance.identity.clone())) ) @@ -152,7 +153,7 @@ pub async fn run_actor((library, node, ingest_notify): (Arc, Arc, &BASE64_STANDARD.decode(collection.contents) ))); - err_break!(write_cloud_ops_to_db(compressed_operations.into_ops(), db).await); + err_break!(write_cloud_ops_to_db(compressed_operations.into_ops(), &db).await); let collection_timestamp = NTP64(collection.end_time.parse().expect("unable to parse time")); diff --git a/core/src/cloud/sync/send.rs b/core/crates/cloud-sync/src/send.rs similarity index 62% rename from core/src/cloud/sync/send.rs rename to core/crates/cloud-sync/src/send.rs index c542f20cd..441a3ef04 100644 --- a/core/src/cloud/sync/send.rs +++ b/core/crates/cloud-sync/src/send.rs @@ -1,19 +1,23 @@ -use crate::{cloud::sync::CompressedCRDTOperations, Node}; +use crate::CompressedCRDTOperations; +use sd_cloud_api::RequestConfigProvider; use sd_core_sync::{GetOpsArgs, SyncMessage, NTP64}; -use sd_prisma::prisma::instance; +use sd_prisma::prisma::{instance, PrismaClient}; use sd_utils::from_bytes_to_uuid; +use uuid::Uuid; use std::{sync::Arc, time::Duration}; use tokio::time::sleep; -use super::{err_break, Library}; - -pub async fn run_actor((library, node): (Arc, Arc)) { - let db = &library.db; - let library_id = library.id; +use super::err_break; +pub async fn run_actor( + db: Arc, + library_id: Uuid, + sync: Arc, + cloud_api_config_provider: Arc, +) { loop { loop { let instances = err_break!( @@ -29,7 +33,7 @@ pub async fn run_actor((library, node): (Arc, Arc)) { let req_adds = err_break!( sd_cloud_api::library::message_collections::request_add( - node.cloud_api_config().await, + cloud_api_config_provider.cloud_api_config().await, library_id, instances, ) @@ -42,22 +46,20 @@ pub async fn run_actor((library, node): (Arc, Arc)) { for req_add in req_adds { let ops = err_break!( - library - .sync - .get_ops(GetOpsArgs { - count: 1000, - clocks: vec![( - req_add.instance_uuid, - NTP64( - req_add - .from_time - .unwrap_or_else(|| "0".to_string()) - .parse() - .expect("couldn't parse ntp64 value"), - ), - )], - }) - .await + sync.get_ops(GetOpsArgs { + count: 1000, + clocks: vec![( + req_add.instance_uuid, + NTP64( + req_add + .from_time + .unwrap_or_else(|| "0".to_string()) + .parse() + .expect("couldn't parse ntp64 value"), + ), + )], + }) + .await ); if ops.is_empty() { @@ -81,12 +83,19 @@ pub async fn run_actor((library, node): (Arc, Arc)) { break; } - err_break!(do_add(node.cloud_api_config().await, library_id, instances,).await); + err_break!( + do_add( + cloud_api_config_provider.cloud_api_config().await, + library_id, + instances, + ) + .await + ); } { // recreate subscription each time so that existing messages are dropped - let mut rx = library.sync.subscribe(); + let mut rx = sync.subscribe(); // wait until Created message comes in loop { diff --git a/core/src/cloud/sync.rs b/core/src/cloud/sync.rs new file mode 100644 index 000000000..f28940d70 --- /dev/null +++ b/core/src/cloud/sync.rs @@ -0,0 +1,65 @@ +use std::sync::{atomic, Arc}; +use tokio::sync::Notify; + +use crate::{library::Library, Node}; + +pub async fn declare_actors(library: &Arc, node: &Arc) { + let ingest_notify = Arc::new(Notify::new()); + let actors = &library.actors; + + let autorun = node.cloud_sync_flag.load(atomic::Ordering::Relaxed); + + actors + .declare( + "Cloud Sync Sender", + { + let library = library.clone(); + let node = node.clone(); + + move || { + sd_core_cloud_sync::send::run_actor( + library.db.clone(), + library.id, + library.sync.clone(), + node.clone(), + ) + } + }, + autorun, + ) + .await; + + actors + .declare( + "Cloud Sync Receiver", + { + let library = library.clone(); + let node = node.clone(); + let ingest_notify = ingest_notify.clone(); + + move || { + sd_core_cloud_sync::receive::run_actor( + library.db.clone(), + library.id, + library.instance_uuid, + library.sync.clone(), + node.clone(), + ingest_notify, + ) + } + }, + autorun, + ) + .await; + + actors + .declare( + "Cloud Sync Ingest", + { + let library = library.clone(); + move || sd_core_cloud_sync::ingest::run_actor(library.sync.clone(), ingest_notify) + }, + autorun, + ) + .await; +} diff --git a/core/src/lib.rs b/core/src/lib.rs index bb109e4bc..3c7dc7429 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -304,6 +304,12 @@ impl Node { } } +impl sd_cloud_api::RequestConfigProvider for Node { + async fn cloud_api_config(self: &Arc) -> sd_cloud_api::RequestConfig { + Node::cloud_api_config(self).await + } +} + /// Error type for Node related errors. #[derive(Error, Debug)] pub enum NodeError { diff --git a/core/src/library/library.rs b/core/src/library/library.rs index 6bf8d2c3d..60c9562cd 100644 --- a/core/src/library/library.rs +++ b/core/src/library/library.rs @@ -20,7 +20,7 @@ use tokio::{fs, io, sync::broadcast, sync::RwLock}; use tracing::warn; use uuid::Uuid; -use super::{Actors, LibraryConfig, LibraryManagerError}; +use super::{LibraryConfig, LibraryManagerError}; // TODO: Finish this // pub enum LibraryNew { @@ -53,7 +53,7 @@ pub struct Library { // TODO(@Oscar): Get rid of this with the new invalidation system. event_bus_tx: broadcast::Sender, - pub actors: Arc, + pub actors: Arc, } impl Debug for Library { diff --git a/core/src/library/mod.rs b/core/src/library/mod.rs index 944f40410..49057177f 100644 --- a/core/src/library/mod.rs +++ b/core/src/library/mod.rs @@ -1,4 +1,3 @@ -mod actors; mod config; #[allow(clippy::module_inception)] mod library; @@ -6,7 +5,6 @@ mod manager; mod name; mod statistics; -pub use actors::*; pub use config::*; pub use library::*; pub use manager::*; diff --git a/crates/actors/Cargo.toml b/crates/actors/Cargo.toml new file mode 100644 index 000000000..bdcf2401e --- /dev/null +++ b/crates/actors/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "sd-actors" +version = "0.1.0" +license.workspace = true +edition.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +futures.workspace = true +tokio.workspace = true diff --git a/core/src/library/actors.rs b/crates/actors/src/lib.rs similarity index 100% rename from core/src/library/actors.rs rename to crates/actors/src/lib.rs diff --git a/crates/cloud-api/src/lib.rs b/crates/cloud-api/src/lib.rs index eae0f61c5..6e8b0cc92 100644 --- a/crates/cloud-api/src/lib.rs +++ b/crates/cloud-api/src/lib.rs @@ -1,5 +1,7 @@ pub mod auth; +use std::{future::Future, sync::Arc}; + use auth::OAuthToken; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -12,6 +14,10 @@ pub struct RequestConfig { pub auth_token: Option, } +pub trait RequestConfigProvider { + fn cloud_api_config(self: &Arc) -> impl Future + Send; +} + #[derive(thiserror::Error, Debug)] #[error("{0}")] pub struct Error(String); diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 50fa4928b..6d833ff50 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.73" +channel = "1.75"