[ENG-835] Duplicating an empty directory doesn't duplicate nested empty directories (#1213)

* Bunch of warnings

* Solving the issue

* sd_core_sync::New

* dumb

---------

Co-authored-by: Brendan Allan <brendonovich@outlook.com>
This commit is contained in:
Ericson "Fogo" Soares 2023-08-13 15:34:46 -03:00 committed by GitHub
parent a74e9aa341
commit 4a3acfa1c8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 102 additions and 90 deletions

1
.clippy.toml Normal file
View file

@ -0,0 +1 @@
allow-unwrap-in-tests = true

View file

@ -118,10 +118,7 @@ impl Actor {
let mut timestamp = { let mut timestamp = {
let mut clocks = self.timestamps.write().await; let mut clocks = self.timestamps.write().await;
clocks *clocks.entry(op.instance).or_insert_with(|| op.timestamp)
.entry(op.instance)
.or_insert_with(|| op.timestamp)
.clone()
}; };
if timestamp < op.timestamp { if timestamp < op.timestamp {

View file

@ -14,19 +14,19 @@ pub struct Manager {
shared: Arc<SharedState>, shared: Arc<SharedState>,
} }
pub struct SyncManagerNew {
pub manager: Manager,
pub rx: broadcast::Receiver<SyncMessage>,
}
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)] #[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
pub struct GetOpsArgs { pub struct GetOpsArgs {
pub clocks: Vec<(Uuid, NTP64)>, pub clocks: Vec<(Uuid, NTP64)>,
pub count: u32, pub count: u32,
} }
pub struct New<T> {
pub manager: T,
pub rx: broadcast::Receiver<SyncMessage>,
}
impl Manager { impl Manager {
pub fn new(db: &Arc<PrismaClient>, instance: Uuid) -> SyncManagerNew { pub fn new(db: &Arc<PrismaClient>, instance: Uuid) -> New<Self> {
let (tx, rx) = broadcast::channel(64); let (tx, rx) = broadcast::channel(64);
let timestamps: Timestamps = Default::default(); let timestamps: Timestamps = Default::default();
@ -41,7 +41,7 @@ impl Manager {
let ingest = ingest::Actor::spawn(shared.clone()); let ingest = ingest::Actor::spawn(shared.clone());
SyncManagerNew { New {
manager: Self { shared, tx, ingest }, manager: Self { shared, tx, ingest },
rx, rx,
} }

View file

@ -112,15 +112,14 @@ async fn bruh() -> Result<(), Box<dyn std::error::Error>> {
async move { async move {
while let Ok(msg) = sync_rx1.recv().await { while let Ok(msg) = sync_rx1.recv().await {
match msg { if let SyncMessage::Created = msg {
SyncMessage::Created => instance2 instance2
.sync .sync
.ingest .ingest
.event_tx .event_tx
.send(ingest::Event::Notification) .send(ingest::Event::Notification)
.await .await
.unwrap(), .unwrap()
_ => {}
} }
} }
} }

View file

@ -48,8 +48,8 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
} }
}; };
let instant = intervals.entry(progress_event.id).or_insert_with(|| let instant = intervals.entry(progress_event.id).or_insert_with(
Instant::now() Instant::now
); );
if instant.elapsed() <= Duration::from_secs_f64(1.0 / 30.0) { if instant.elapsed() <= Duration::from_secs_f64(1.0 / 30.0) {

View file

@ -9,7 +9,7 @@ use crate::{
util::AbortOnDrop, util::AbortOnDrop,
}; };
use std::path::{Path, PathBuf}; use std::path::PathBuf;
use rspc::{self, alpha::AlphaRouter, ErrorCode}; use rspc::{self, alpha::AlphaRouter, ErrorCode};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};

View file

@ -114,20 +114,20 @@ impl InvalidRequests {
/// ); /// );
/// ``` /// ```
#[macro_export] #[macro_export]
#[allow(clippy::crate_in_macro_def)] // #[allow(clippy::crate_in_macro_def)]
macro_rules! invalidate_query { macro_rules! invalidate_query {
($ctx:expr, $key:literal) => {{ ($ctx:expr, $key:literal) => {{
let ctx: &crate::library::Library = &$ctx; // Assert the context is the correct type let ctx: &$crate::library::Library = &$ctx; // Assert the context is the correct type
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
{ {
#[ctor::ctor] #[ctor::ctor]
fn invalidate() { fn invalidate() {
crate::api::utils::INVALIDATION_REQUESTS $crate::api::utils::INVALIDATION_REQUESTS
.lock() .lock()
.unwrap() .unwrap()
.queries .queries
.push(crate::api::utils::InvalidationRequest { .push($crate::api::utils::InvalidationRequest {
key: $key, key: $key,
arg_ty: None, arg_ty: None,
result_ty: None, result_ty: None,
@ -139,23 +139,23 @@ macro_rules! invalidate_query {
::tracing::trace!(target: "sd_core::invalidate-query", "invalidate_query!(\"{}\") at {}", $key, concat!(file!(), ":", line!())); ::tracing::trace!(target: "sd_core::invalidate-query", "invalidate_query!(\"{}\") at {}", $key, concat!(file!(), ":", line!()));
// The error are ignored here because they aren't mission critical. If they fail the UI might be outdated for a bit. // The error are ignored here because they aren't mission critical. If they fail the UI might be outdated for a bit.
ctx.emit(crate::api::CoreEvent::InvalidateOperation( ctx.emit($crate::api::CoreEvent::InvalidateOperation(
crate::api::utils::InvalidateOperationEvent::dangerously_create($key, serde_json::Value::Null, None) $crate::api::utils::InvalidateOperationEvent::dangerously_create($key, serde_json::Value::Null, None)
)) ))
}}; }};
($ctx:expr, $key:literal: $arg_ty:ty, $arg:expr $(,)?) => {{ ($ctx:expr, $key:literal: $arg_ty:ty, $arg:expr $(,)?) => {{
let _: $arg_ty = $arg; // Assert the type the user provided is correct let _: $arg_ty = $arg; // Assert the type the user provided is correct
let ctx: &crate::library::Library = &$ctx; // Assert the context is the correct type let ctx: &$crate::library::Library = &$ctx; // Assert the context is the correct type
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
{ {
#[ctor::ctor] #[ctor::ctor]
fn invalidate() { fn invalidate() {
crate::api::utils::INVALIDATION_REQUESTS $crate::api::utils::INVALIDATION_REQUESTS
.lock() .lock()
.unwrap() .unwrap()
.queries .queries
.push(crate::api::utils::InvalidationRequest { .push($crate::api::utils::InvalidationRequest {
key: $key, key: $key,
arg_ty: Some(<$arg_ty as rspc::internal::specta::Type>::reference(rspc::internal::specta::DefOpts { arg_ty: Some(<$arg_ty as rspc::internal::specta::Type>::reference(rspc::internal::specta::DefOpts {
parent_inline: false, parent_inline: false,
@ -172,8 +172,8 @@ macro_rules! invalidate_query {
// The error are ignored here because they aren't mission critical. If they fail the UI might be outdated for a bit. // The error are ignored here because they aren't mission critical. If they fail the UI might be outdated for a bit.
let _ = serde_json::to_value($arg) let _ = serde_json::to_value($arg)
.map(|v| .map(|v|
ctx.emit(crate::api::CoreEvent::InvalidateOperation( ctx.emit($crate::api::CoreEvent::InvalidateOperation(
crate::api::utils::InvalidateOperationEvent::dangerously_create($key, v, None), $crate::api::utils::InvalidateOperationEvent::dangerously_create($key, v, None),
)) ))
) )
.map_err(|_| { .map_err(|_| {
@ -182,17 +182,17 @@ macro_rules! invalidate_query {
}}; }};
($ctx:expr, $key:literal: $arg_ty:ty, $arg:expr, $result_ty:ty: $result:expr $(,)?) => {{ ($ctx:expr, $key:literal: $arg_ty:ty, $arg:expr, $result_ty:ty: $result:expr $(,)?) => {{
let _: $arg_ty = $arg; // Assert the type the user provided is correct let _: $arg_ty = $arg; // Assert the type the user provided is correct
let ctx: &crate::library::Library = &$ctx; // Assert the context is the correct type let ctx: &$crate::library::Library = &$ctx; // Assert the context is the correct type
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
{ {
#[ctor::ctor] #[ctor::ctor]
fn invalidate() { fn invalidate() {
crate::api::utils::INVALIDATION_REQUESTS $crate::api::utils::INVALIDATION_REQUESTS
.lock() .lock()
.unwrap() .unwrap()
.queries .queries
.push(crate::api::utils::InvalidationRequest { .push($crate::api::utils::InvalidationRequest {
key: $key, key: $key,
arg_ty: Some(<$arg_ty as rspc::internal::specta::Type>::reference(rspc::internal::specta::DefOpts { arg_ty: Some(<$arg_ty as rspc::internal::specta::Type>::reference(rspc::internal::specta::DefOpts {
parent_inline: false, parent_inline: false,
@ -214,8 +214,8 @@ macro_rules! invalidate_query {
.and_then(|arg| .and_then(|arg|
serde_json::to_value($result) serde_json::to_value($result)
.map(|result| .map(|result|
ctx.emit(crate::api::CoreEvent::InvalidateOperation( ctx.emit($crate::api::CoreEvent::InvalidateOperation(
crate::api::utils::InvalidateOperationEvent::dangerously_create($key, arg, Some(result)), $crate::api::utils::InvalidateOperationEvent::dangerously_create($key, arg, Some(result)),
)) ))
) )
) )

View file

@ -387,7 +387,7 @@ impl Libraries {
identity, identity,
// key_manager, // key_manager,
db, db,
&node, node,
Arc::new(sync.manager), Arc::new(sync.manager),
) )
.await; .await;

View file

@ -137,26 +137,39 @@ impl StatefulJob for FileCopierJobInit {
.expect("We got the children path from the read_dir, so it should be a child of the source path"), .expect("We got the children path from the read_dir, so it should be a child of the source path"),
); );
// Currently not supporting file_name suffixes children files in a directory being copied match get_file_data_from_isolated_file_path(
more_steps.push(FileCopierJobStep { &ctx.library.db,
target_full_path: target_children_full_path, &data.sources_location_path,
source_file_data: get_file_data_from_isolated_file_path( &IsolatedFilePathData::new(
&ctx.library.db, init.source_location_id,
&data.sources_location_path, &data.sources_location_path,
&IsolatedFilePathData::new( &children_path,
init.source_location_id, children_entry
&data.sources_location_path, .metadata()
&children_path, .await
children_entry .map_err(|e| FileIOError::from((&children_path, e)))?
.metadata() .is_dir(),
.await
.map_err(|e| FileIOError::from((&children_path, e)))?
.is_dir(),
)
.map_err(FileSystemJobsError::from)?,
) )
.await?, .map_err(FileSystemJobsError::from)?,
}); )
.await
{
Ok(source_file_data) => {
// Currently not supporting file_name suffixes children files in a directory being copied
more_steps.push(FileCopierJobStep {
target_full_path: target_children_full_path,
source_file_data,
});
}
Err(FileSystemJobsError::FilePathNotFound(path)) => {
// FilePath doesn't exist in the database, it possibly wasn't indexed, so we skip it
warn!(
"Skipping duplicating {} as it wasn't indexed",
path.display()
);
}
Err(e) => return Err(e.into()),
}
} }
Ok(more_steps.into()) Ok(more_steps.into())

View file

@ -95,6 +95,7 @@ pub async fn get_file_data_from_isolated_file_path(
location_path: impl AsRef<Path>, location_path: impl AsRef<Path>,
iso_file_path: &IsolatedFilePathData<'_>, iso_file_path: &IsolatedFilePathData<'_>,
) -> Result<FileData, FileSystemJobsError> { ) -> Result<FileData, FileSystemJobsError> {
let location_path = location_path.as_ref();
db.file_path() db.file_path()
.find_unique(iso_file_path.into()) .find_unique(iso_file_path.into())
.include(file_path_with_object::include()) .include(file_path_with_object::include())
@ -102,16 +103,12 @@ pub async fn get_file_data_from_isolated_file_path(
.await? .await?
.ok_or_else(|| { .ok_or_else(|| {
FileSystemJobsError::FilePathNotFound( FileSystemJobsError::FilePathNotFound(
AsRef::<Path>::as_ref(iso_file_path) location_path.join(iso_file_path).into_boxed_path(),
.to_path_buf()
.into_boxed_path(),
) )
}) })
.and_then(|path_data| { .and_then(|path_data| {
Ok(FileData { Ok(FileData {
full_path: location_path full_path: location_path.join(IsolatedFilePathData::try_from(&path_data)?),
.as_ref()
.join(IsolatedFilePathData::try_from(&path_data)?),
file_path: path_data, file_path: path_data,
}) })
}) })

View file

@ -129,8 +129,7 @@ impl PairingManager {
.get_all() .get_all()
.await .await
.into_iter() .into_iter()
.find(|i| i.id == library_id) .any(|i| i.id == library_id)
.is_some()
{ {
self.emit_progress(pairing_id, PairingStatus::LibraryAlreadyExists); self.emit_progress(pairing_id, PairingStatus::LibraryAlreadyExists);
@ -246,7 +245,7 @@ impl PairingManager {
.send(P2PEvent::PairingRequest { .send(P2PEvent::PairingRequest {
id: pairing_id, id: pairing_id,
name: remote_instance.node_name.clone(), name: remote_instance.node_name.clone(),
os: remote_instance.node_platform.clone().into(), os: remote_instance.node_platform.into(),
}) })
.ok(); .ok();

View file

@ -222,8 +222,8 @@ mod tests {
node_id: Uuid::new_v4(), node_id: Uuid::new_v4(),
node_name: "Node Name".into(), node_name: "Node Name".into(),
node_platform: Platform::current(), node_platform: Platform::current(),
last_seen: Utc::now().into(), last_seen: Utc::now(),
date_created: Utc::now().into(), date_created: Utc::now(),
}; };
{ {

View file

@ -263,7 +263,7 @@ mod originator {
dbg!(&library.instances); dbg!(&library.instances);
// TODO: Deduplicate any duplicate peer ids -> This is an edge case but still // TODO: Deduplicate any duplicate peer ids -> This is an edge case but still
for (_, instance) in &library.instances { for instance in library.instances.values() {
let InstanceState::Connected(peer_id) = *instance else { let InstanceState::Connected(peer_id) = *instance else {
continue; continue;
}; };
@ -276,12 +276,7 @@ mod originator {
"Alerting peer '{peer_id:?}' of new sync events for library '{library_id:?}'" "Alerting peer '{peer_id:?}' of new sync events for library '{library_id:?}'"
); );
let mut stream = p2p let mut stream = p2p.manager.stream(peer_id).await.map_err(|_| ()).unwrap(); // TODO: handle providing incorrect peer id
.manager
.stream(peer_id.clone())
.await
.map_err(|_| ())
.unwrap(); // TODO: handle providing incorrect peer id
stream stream
.write_all(&Header::Sync(library_id).to_bytes()) .write_all(&Header::Sync(library_id).to_bytes())

View file

@ -27,10 +27,10 @@ impl SyncMessage {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use sd_core_sync::NTP64; // use sd_core_sync::NTP64;
use sd_sync::SharedOperation; // use sd_sync::SharedOperation;
use serde_json::Value; // use serde_json::Value;
use uuid::Uuid; // use uuid::Uuid;
use super::*; use super::*;

View file

@ -42,7 +42,8 @@ impl PreferenceValue {
pub fn new(value: impl Serialize) -> Self { pub fn new(value: impl Serialize) -> Self {
let mut bytes = vec![]; let mut bytes = vec![];
rmp_serde::encode::write_named(&mut bytes, &value).unwrap(); rmp_serde::encode::write_named(&mut bytes, &value)
.expect("Failed to serialize preference value");
// let value = rmpv::decode::read_value(&mut bytes.as_slice()).unwrap(); // let value = rmpv::decode::read_value(&mut bytes.as_slice()).unwrap();
@ -52,7 +53,8 @@ impl PreferenceValue {
pub fn from_value(value: Value) -> Self { pub fn from_value(value: Value) -> Self {
let mut bytes = vec![]; let mut bytes = vec![];
rmpv::encode::write_value(&mut bytes, &value).unwrap(); rmpv::encode::write_value(&mut bytes, &value)
.expect("Failed to serialize preference value");
Self(bytes) Self(bytes)
} }
@ -76,6 +78,7 @@ pub enum Entry {
Nested(Entries), Nested(Entries),
} }
#[allow(clippy::unwrap_used, clippy::panic)]
impl Entry { impl Entry {
pub fn expect_value<T: DeserializeOwned>(self) -> T { pub fn expect_value<T: DeserializeOwned>(self) -> T {
match self { match self {

View file

@ -1,14 +1,15 @@
mod kv;
pub use kv::*;
use specta::Type;
use crate::prisma::PrismaClient; use crate::prisma::PrismaClient;
use std::collections::HashMap; use std::collections::HashMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use specta::Type;
use tracing::error;
use uuid::Uuid; use uuid::Uuid;
mod kv;
pub use kv::*;
// Preferences are a set of types that are serialized as a list of key-value pairs, // Preferences are a set of types that are serialized as a list of key-value pairs,
// where nested type keys are serialized as a dot-separated path. // where nested type keys are serialized as a dot-separated path.
// They are serailized as a list because this allows preferences to be a synchronisation boundary, // They are serailized as a list because this allows preferences to be a synchronisation boundary,
@ -36,9 +37,15 @@ impl LibraryPreferences {
let prefs = PreferenceKVs::new( let prefs = PreferenceKVs::new(
kvs.into_iter() kvs.into_iter()
.filter_map(|data| { .filter_map(|data| {
let a = rmpv::decode::read_value(&mut data.value?.as_slice()).unwrap(); rmpv::decode::read_value(&mut data.value?.as_slice())
.map_err(|e| error!("{e:#?}"))
Some((PreferenceKey::new(data.key), PreferenceValue::from_value(a))) .ok()
.map(|value| {
(
PreferenceKey::new(data.key),
PreferenceValue::from_value(value),
)
})
}) })
.collect(), .collect(),
); );
@ -101,9 +108,10 @@ where
entries entries
.into_iter() .into_iter()
.map(|(key, value)| { .map(|(key, value)| {
let id = Uuid::parse_str(&key).unwrap(); (
Uuid::parse_str(&key).expect("invalid uuid in preferences"),
(id, V::from_entries(value.expect_nested())) V::from_entries(value.expect_nested()),
)
}) })
.collect() .collect()
} }

View file

@ -276,7 +276,7 @@ mod tests {
.await .await
.unwrap(); .unwrap();
assert!(true, "recv a closed"); // assert!(true, "recv a closed");
} }
}); });
@ -297,7 +297,7 @@ mod tests {
.await .await
.unwrap(); .unwrap();
assert!(true, "recv b closed"); // assert!(true, "recv b closed");
} }
}); });