[ENG-1426] Apply clippy --fix with moderate lints (#1779)

* chore(clippy): fix

* chore: formatting

* fix: add cfg back to macos crate

* remove cfg to see if clippy works

* Restore macOS crate level conditional compilation

* More warns fixing

---------

Co-authored-by: Vítor Vasconcellos <vasconcellos.dev@gmail.com>
Co-authored-by: Ericson Fogo Soares <ericson.ds999@gmail.com>
This commit is contained in:
jake 2023-11-28 17:50:11 +00:00 committed by GitHub
parent b743bb9a9f
commit c79c190c78
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
40 changed files with 181 additions and 139 deletions

View file

@ -1,4 +1,5 @@
#![cfg(target_os = "macos")]
use swift_rs::{swift, Bool, Int, SRData, SRObjectArray, SRString};
pub type NSObject = *mut std::ffi::c_void;

View file

@ -55,7 +55,7 @@ pub async fn open_file_paths(
};
open_result
.map(|_| OpenFilePathResult::AllGood(id))
.map(|()| OpenFilePathResult::AllGood(id))
.unwrap_or_else(|err| {
error!("Failed to open logs dir: {err}");
OpenFilePathResult::OpenError(id, err.to_string())
@ -331,7 +331,7 @@ pub async fn open_ephemeral_file_with(paths_and_urls: Vec<PathAndUrl>) -> Result
#[cfg(target_os = "macos")]
if let Some(path) = path.to_str().map(str::to_string) {
if let Err(e) = spawn_blocking(move || {
sd_desktop_macos::open_file_paths_with(&[path], &url)
sd_desktop_macos::open_file_paths_with(&[path], &url);
})
.await
{
@ -452,7 +452,7 @@ pub async fn reveal_items(
.await
.unwrap_or_default()
.into_iter()
.flat_map(|location| location.path.map(Into::into)),
.filter_map(|location| location.path.map(Into::into)),
);
}

View file

@ -47,7 +47,7 @@ async fn set_menu_bar_item_state(_window: tauri::Window, _id: String, _enabled:
.menu_handle()
.get_item(&_id)
.set_enabled(_enabled)
.expect("Unable to modify menu item")
.expect("Unable to modify menu item");
}
}
@ -65,7 +65,7 @@ fn reload_webview_inner(webview: PlatformWebview) {
#[cfg(target_os = "macos")]
{
unsafe {
sd_desktop_macos::reload_webview(&(webview.inner() as _));
sd_desktop_macos::reload_webview(&webview.inner().cast());
}
}
#[cfg(target_os = "linux")]
@ -245,18 +245,16 @@ async fn main() -> tauri::Result<()> {
#[cfg(target_os = "macos")]
{
use sd_desktop_macos::*;
use sd_desktop_macos::{blur_window_background, set_titlebar_style};
let nswindow = window.ns_window().unwrap();
unsafe { set_titlebar_style(&nswindow, true) };
unsafe { blur_window_background(&nswindow) };
let menu_handle = window.menu_handle();
tokio::spawn({
let libraries = node.libraries.clone();
let menu_handle = menu_handle.clone();
let menu_handle = window.menu_handle();
async move {
if libraries.get_all().await.is_empty() {
menu::set_library_locked_menu_items_enabled(menu_handle, false);

View file

@ -3,7 +3,7 @@ use tauri::{Manager, Menu, WindowMenuEvent, Wry};
#[cfg(target_os = "macos")]
use tauri::{AboutMetadata, CustomMenuItem, MenuItem, Submenu};
pub(super) fn get_menu() -> Menu {
pub fn get_menu() -> Menu {
#[cfg(target_os = "macos")]
{
custom_menu_bar()
@ -131,7 +131,7 @@ fn custom_menu_bar() -> Menu {
.add_submenu(Submenu::new("Window", window_menu))
}
pub(crate) fn handle_menu_event(event: WindowMenuEvent<Wry>) {
pub fn handle_menu_event(event: WindowMenuEvent<Wry>) {
match event.menu_item_id() {
"quit" => {
let app = event.window().app_handle();
@ -180,12 +180,9 @@ pub(crate) fn handle_menu_event(event: WindowMenuEvent<Wry>) {
/// If any are explicitly marked with `.disabled()` in the `custom_menu_bar()` function, this won't have an effect.
/// We include them in the locked menu IDs anyway for future-proofing, in-case someone forgets.
#[cfg(target_os = "macos")]
pub(super) fn set_library_locked_menu_items_enabled(
handle: tauri::window::MenuHandle,
enabled: bool,
) {
pub fn set_library_locked_menu_items_enabled(handle: tauri::window::MenuHandle, enabled: bool) {
LIBRARY_LOCKED_MENU_IDS
.iter()
.try_for_each(|id| handle.get_item(id).set_enabled(enabled))
.expect("Unable to disable menu items (there are no libraries present, so certain options should be hidden)")
.expect("Unable to disable menu items (there are no libraries present, so certain options should be hidden)");
}

View file

@ -11,7 +11,7 @@ impl Update {
fn new(update: &tauri::updater::UpdateResponse<impl tauri::Runtime>) -> Self {
Self {
version: update.latest_version().to_string(),
body: update.body().map(|b| b.to_string()),
body: update.body().map(ToString::to_string),
}
}
}
@ -61,8 +61,9 @@ pub async fn check_for_update(app: tauri::AppHandle) -> Result<Option<Update>, S
"updater",
update
.clone()
.map(|update| UpdateEvent::UpdateAvailable { update })
.unwrap_or(UpdateEvent::NoUpdateAvailable),
.map_or(UpdateEvent::NoUpdateAvailable, |update| {
UpdateEvent::UpdateAvailable { update }
}),
)
.ok();

View file

@ -1,7 +1,10 @@
use futures::{future::join_all, StreamExt};
use futures_channel::mpsc;
use once_cell::sync::{Lazy, OnceCell};
use rspc::internal::jsonrpc::{self, *};
use rspc::internal::jsonrpc::{
self, handle_json_rpc, OwnedMpscSender, Request, RequestId, Response, Sender,
SubscriptionUpgrade,
};
use sd_core::{api::Router, Node};
use serde_json::{from_str, from_value, to_string, Value};
use std::{

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use sd_core::Node;
use tokio::signal;
/// shutdown_signal will inform axum to gracefully shutdown when the process is asked to shutdown.
/// `shutdown_signal` will inform axum to gracefully shutdown when the process is asked to shutdown.
pub async fn axum_shutdown_signal(node: Arc<Node>) {
let ctrl_c = async {
signal::ctrl_c()
@ -23,8 +23,8 @@ pub async fn axum_shutdown_signal(node: Arc<Node>) {
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
() = ctrl_c => {},
() = terminate => {},
}
println!("signal received, starting graceful shutdown");

View file

@ -1,5 +1,5 @@
use sd_prisma::prisma::*;
use sd_sync::*;
use sd_prisma::prisma::{relation_operation, shared_operation};
use sd_sync::{CRDTOperation, CRDTOperationType, RelationOperation, SharedOperation};
use uhlc::NTP64;
use uuid::Uuid;

View file

@ -1,14 +1,20 @@
use std::{ops::Deref, sync::Arc};
use sd_prisma::{prisma::*, prisma_sync::ModelSyncData};
use sd_sync::*;
use sd_prisma::{
prisma::{instance, relation_operation, shared_operation, SortOrder},
prisma_sync::ModelSyncData,
};
use sd_sync::{CRDTOperation, CRDTOperationType, RelationOperation, SharedOperation};
use sd_utils::uuid_to_bytes;
use serde_json::to_vec;
use tokio::sync::{mpsc, Mutex};
use uhlc::{Timestamp, NTP64};
use uuid::Uuid;
use crate::{actor::*, wait, SharedState};
use crate::{
actor::{create_actor_io, ActorIO, ActorTypes},
wait, SharedState,
};
#[derive(Debug)]
#[must_use]

View file

@ -5,8 +5,8 @@ mod db_operation;
pub mod ingest;
mod manager;
use sd_prisma::prisma::*;
use sd_sync::*;
use sd_prisma::prisma::{instance, relation_operation, shared_operation, PrismaClient};
use sd_sync::{CRDTOperation, RelationOperation, SharedOperation};
use std::{
collections::HashMap,
@ -33,6 +33,7 @@ pub struct SharedState {
pub clock: uhlc::HLC,
}
#[must_use]
pub fn shared_op_db(op: &CRDTOperation, shared_op: &SharedOperation) -> shared_operation::Create {
shared_operation::Create {
id: op.id.as_bytes().to_vec(),
@ -46,6 +47,7 @@ pub fn shared_op_db(op: &CRDTOperation, shared_op: &SharedOperation) -> shared_o
}
}
#[must_use]
pub fn relation_op_db(
op: &CRDTOperation,
relation_op: &RelationOperation,

View file

@ -1,8 +1,11 @@
use sd_prisma::prisma::*;
use sd_sync::*;
use sd_prisma::prisma::{instance, relation_operation, shared_operation, PrismaClient, SortOrder};
use sd_sync::{CRDTOperation, CRDTOperationType, OperationFactory};
use sd_utils::uuid_to_bytes;
use crate::{db_operation::*, *};
use crate::{
db_operation::{relation_include, shared_include, DbOperation},
ingest, relation_op_db, shared_op_db, SharedState, SyncMessage, Timestamps, NTP64,
};
use std::{
cmp::Ordering,
ops::Deref,
@ -54,7 +57,7 @@ impl Manager {
let ingest = ingest::Actor::spawn(shared.clone());
New {
manager: Self { shared, tx, ingest },
manager: Self { tx, ingest, shared },
rx,
}
}
@ -169,12 +172,12 @@ impl Manager {
._batch((
db.shared_operation()
.find_many(db_args!(args, shared_operation))
.take(args.count as i64)
.take(i64::from(args.count))
.order_by(shared_operation::timestamp::order(SortOrder::Asc))
.include(shared_include::include()),
db.relation_operation()
.find_many(db_args!(args, relation_operation))
.take(args.count as i64)
.take(i64::from(args.count))
.order_by(relation_operation::timestamp::order(SortOrder::Asc))
.include(relation_include::include()),
))

View file

@ -70,7 +70,7 @@ impl Instance {
uuid_to_bytes(right.id),
vec![],
vec![],
"".to_string(),
String::new(),
0,
Utc::now().into(),
Utc::now().into(),
@ -87,7 +87,7 @@ impl Instance {
uuid_to_bytes(left.id),
vec![],
vec![],
"".to_string(),
String::new(),
0,
Utc::now().into(),
Utc::now().into(),
@ -112,14 +112,14 @@ async fn bruh() -> Result<(), Box<dyn std::error::Error>> {
async move {
while let Ok(msg) = sync_rx1.recv().await {
if let SyncMessage::Created = msg {
if matches!(msg, SyncMessage::Created) {
instance2
.sync
.ingest
.event_tx
.send(ingest::Event::Notification)
.await
.unwrap()
.unwrap();
}
}
}

View file

@ -79,7 +79,7 @@ impl FilePathFilterArgs {
Ok(match self {
Self::Locations(v) => v
.to_param(
.into_param(
file_path::location_id::in_vec,
file_path::location_id::not_in_vec,
)
@ -120,13 +120,13 @@ impl FilePathFilterArgs {
.unwrap_or_default()
}
Self::Name(v) => v
.to_param(name::contains, name::starts_with, name::ends_with, |s| {
.into_param(name::contains, name::starts_with, name::ends_with, |s| {
name::equals(Some(s))
})
.map(|v| vec![v])
.unwrap_or_default(),
Self::Extension(v) => v
.to_param(extension::in_vec, extension::not_in_vec)
.into_param(extension::in_vec, extension::not_in_vec)
.map(|v| vec![v])
.unwrap_or_default(),
Self::CreatedAt(v) => {

View file

@ -122,14 +122,14 @@ impl ObjectFilterArgs {
Self::Favorite(v) => vec![favorite::equals(Some(v))],
Self::Hidden(v) => v.to_param().map(|v| vec![v]).unwrap_or_default(),
Self::Tags(v) => v
.to_param(
.into_param(
|v| tags::some(vec![tag_on_object::tag_id::in_vec(v)]),
|v| tags::none(vec![tag_on_object::tag_id::in_vec(v)]),
)
.map(|v| vec![v])
.unwrap_or_default(),
Self::Kind(v) => v
.to_param(kind::in_vec, kind::not_in_vec)
.into_param(kind::in_vec, kind::not_in_vec)
.map(|v| vec![v])
.unwrap_or_default(),
Self::DateAccessed(v) => {

View file

@ -1,12 +1,14 @@
use crate::{api::utils::library, invalidate_query, prisma::saved_search};
use sd_utils::chain_optional_iter;
use chrono::{DateTime, FixedOffset, Utc};
use rspc::alpha::AlphaRouter;
use sd_utils::chain_optional_iter;
use serde::{Deserialize, Serialize};
use serde::{de::IgnoredAny, Deserialize, Serialize};
use specta::Type;
use tracing::error;
use uuid::Uuid;
use crate::{api::utils::library, invalidate_query, prisma::saved_search};
use super::{Ctx, R};
pub(crate) fn mount() -> AlphaRouter<Ctx> {
@ -44,13 +46,15 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
[
args.filters
.map(|s| {
serde_json::to_string(
&serde_json::from_str::<serde_json::Value>(&s)
.unwrap(),
)
.unwrap()
// https://github.com/serde-rs/json/issues/579
// https://docs.rs/serde/latest/serde/de/struct.IgnoredAny.html
if let Err(e) = serde_json::from_str::<IgnoredAny>(&s) {
error!("failed to parse filters: {e:#?}");
None
} else {
Some(s)
}
})
.map(Some)
.map(saved_search::filters::set),
args.search.map(Some).map(saved_search::search::set),
args.description

View file

@ -33,6 +33,7 @@ pub enum MaybeNot<T> {
}
impl<T> MaybeNot<T> {
#[allow(unused)]
pub fn into_prisma<R: From<prisma_client_rust::Operator<R>>>(self, param: fn(T) -> R) -> R {
match self {
Self::None(v) => param(v),
@ -71,7 +72,7 @@ impl<T> InOrNotIn<T> {
}
}
pub fn to_param<TParam>(
pub fn into_param<TParam>(
self,
in_fn: fn(Vec<T>) -> TParam,
not_in_fn: fn(Vec<T>) -> TParam,
@ -105,7 +106,7 @@ impl TextMatch {
}
// 3. Update the to_param method of TextMatch
pub fn to_param<TParam>(
pub fn into_param<TParam>(
self,
contains_fn: fn(String) -> TParam,
starts_with_fn: fn(String) -> TParam,

View file

@ -34,6 +34,7 @@ impl DiskAccess {
/// This function is a no-op on non-MacOS systems.
///
/// Once ran, it will open the "Full Disk Access" prompt.
#[allow(clippy::missing_const_for_fn)]
pub fn request_fda() -> Result<()> {
#[cfg(target_os = "macos")]
{

View file

@ -391,7 +391,7 @@ mod test {
#[tokio::test]
async fn magic_bytes() {
async fn test_path(subpath: &str) -> Option<Extension> {
println!("testing {}...", subpath);
println!("testing {subpath}...");
Extension::resolve_conflicting(subpath.split('.').last().unwrap(), true).await
}
// Video extension tests

View file

@ -176,12 +176,12 @@ impl Extension {
pub async fn resolve_conflicting(
path: impl AsRef<Path>,
always_check_magic_bytes: bool,
) -> Option<Extension> {
) -> Option<Self> {
let Some(ext_str) = path.as_ref().extension().and_then(OsStr::to_str) else {
return None;
};
let Some(ext) = Extension::from_str(ext_str) else {
let Some(ext) = Self::from_str(ext_str) else {
return None;
};
@ -215,18 +215,18 @@ impl Extension {
}
}
ExtensionPossibility::Conflicts(ext) => match ext_str {
"ts" if ext.iter().any(|e| matches!(e, Extension::Video(_))) => {
"ts" if ext.iter().any(|e| matches!(e, Self::Video(_))) => {
verify_magic_bytes(VideoExtension::Ts, file)
.await
.map_or(Some(Extension::Code(CodeExtension::Ts)), |video_ext| {
Some(Extension::Video(video_ext))
.map_or(Some(Self::Code(CodeExtension::Ts)), |video_ext| {
Some(Self::Video(video_ext))
})
}
"mts" if ext.iter().any(|e| matches!(e, Extension::Video(_))) => {
"mts" if ext.iter().any(|e| matches!(e, Self::Video(_))) => {
verify_magic_bytes(VideoExtension::Mts, file)
.await
.map_or(Some(Extension::Code(CodeExtension::Mts)), |video_ext| {
Some(Extension::Video(video_ext))
.map_or(Some(Self::Code(CodeExtension::Mts)), |video_ext| {
Some(Self::Video(video_ext))
})
}
_ => None,

View file

@ -95,7 +95,7 @@ fn looks_utf8(buf: &[u8], partial: bool) -> bool {
*/
if TEXT_CHARS[(*byte) as usize] != T {
ctrl = true
ctrl = true;
}
/* 10xxxxxx never 1st byte */
} else if (byte & 0x40) == 0 {
@ -176,11 +176,11 @@ fn looks_ucs16(buf: &[u8]) -> Option<UCS16> {
};
let mut hi: u32 = 0;
for chunck in buf[2..].chunks_exact(2) {
for chunk in buf[2..].chunks_exact(2) {
let mut uc = (if bigend {
(chunck[1] as u32) | (chunck[0] as u32) << 8
u32::from(chunk[1]) | u32::from(chunk[0]) << 8
} else {
(chunck[0] as u32) | (chunck[1] as u32) << 8
u32::from(chunk[0]) | u32::from(chunk[1]) << 8
}) & 0xffff;
match uc {
@ -239,17 +239,17 @@ fn looks_ucs32(buf: &[u8]) -> Option<UCS32> {
return None;
};
for chunck in buf[4..].chunks_exact(4) {
for chunk in buf[4..].chunks_exact(4) {
let uc: u32 = if bigend {
(chunck[3] as u32)
| (chunck[2] as u32) << 8
| (chunck[1] as u32) << 16
| (chunck[0] as u32) << 24
u32::from(chunk[3])
| u32::from(chunk[2]) << 8
| u32::from(chunk[1]) << 16
| u32::from(chunk[0]) << 24
} else {
(chunck[0] as u32)
| (chunck[1] as u32) << 8
| (chunck[2] as u32) << 16
| (chunck[3] as u32) << 24
u32::from(chunk[0])
| u32::from(chunk[1]) << 8
| u32::from(chunk[2]) << 16
| u32::from(chunk[3]) << 24
};
if uc == 0xfffe {
@ -267,6 +267,7 @@ fn looks_ucs32(buf: &[u8]) -> Option<UCS32> {
})
}
#[must_use]
pub fn is_text(data: &[u8], partial: bool) -> Option<&'static str> {
if data.is_empty() {
return None;

View file

@ -14,11 +14,11 @@ use crate::{spacetunnel::RemoteIdentity, ManagerConfig, Mdns, ServiceEventIntern
type ServiceName = String;
pub(crate) type ListenAddrs = HashSet<SocketAddr>;
pub(crate) type State = Arc<RwLock<DiscoveryManagerState>>;
pub type ListenAddrs = HashSet<SocketAddr>;
pub type State = Arc<RwLock<DiscoveryManagerState>>;
/// DiscoveryManager controls all user-defined [Service]'s and connects them with the network through mDNS and other discovery protocols
pub(crate) struct DiscoveryManager {
/// `DiscoveryManager` controls all user-defined [Service]'s and connects them with the network through mDNS and other discovery protocols
pub struct DiscoveryManager {
pub(crate) state: State,
pub(crate) listen_addrs: ListenAddrs,
pub(crate) application_name: &'static str,
@ -86,7 +86,7 @@ impl DiscoveryManager {
self.do_advertisement();
}
_ = poll_fn(|cx| {
() = poll_fn(|cx| {
if let Some(mdns) = &mut self.mdns {
return mdns.poll(cx, &self.listen_addrs, &self.state);
}
@ -105,7 +105,7 @@ impl DiscoveryManager {
#[derive(Debug, Clone)]
#[allow(clippy::type_complexity)]
pub(crate) struct DiscoveryManagerState {
pub struct DiscoveryManagerState {
/// A list of services the current node is advertising w/ their metadata
pub(crate) services: HashMap<
ServiceName,
@ -131,6 +131,7 @@ pub(crate) struct DiscoveryManagerState {
}
impl DiscoveryManagerState {
#[must_use]
pub fn new() -> (Arc<RwLock<Self>>, mpsc::Receiver<String>) {
let (service_shutdown_tx, service_shutdown_rx) = mpsc::channel(10);
@ -148,7 +149,7 @@ impl DiscoveryManagerState {
}
#[derive(Debug, Clone)]
pub(crate) struct DiscoveredPeerCandidate {
pub struct DiscoveredPeerCandidate {
pub(crate) peer_id: PeerId,
pub(crate) meta: HashMap<String, String>,
pub(crate) addresses: Vec<SocketAddr>,

View file

@ -26,7 +26,7 @@ use crate::{
/// TODO
const MDNS_READVERTISEMENT_INTERVAL: Duration = Duration::from_secs(60); // Every minute re-advertise
pub(crate) struct Mdns {
pub struct Mdns {
identity: RemoteIdentity,
peer_id: PeerId,
service_name: String,
@ -48,7 +48,7 @@ impl Mdns {
Ok(Self {
identity,
peer_id,
service_name: format!("_{}._udp.local.", application_name),
service_name: format!("_{application_name}._udp.local."),
advertised_services: Vec::new(),
mdns_daemon,
next_mdns_advertisement: Box::pin(sleep_until(Instant::now())), // Trigger an advertisement immediately
@ -63,7 +63,7 @@ impl Mdns {
// TODO: Second stage rate-limit
let mut ports_to_service = HashMap::new();
for addr in listen_addrs.iter() {
for addr in listen_addrs {
ports_to_service
.entry(addr.port())
.or_insert_with(Vec::new)
@ -74,7 +74,7 @@ impl Mdns {
let mut advertised_services_to_remove = self.advertised_services.clone();
let state = state.read().unwrap_or_else(PoisonError::into_inner);
for (port, ips) in ports_to_service.into_iter() {
for (port, ips) in ports_to_service {
for (service_name, (_, metadata)) in &state.services {
let Some(metadata) = metadata else {
continue;
@ -126,7 +126,7 @@ impl Mdns {
// TODO: Do a proper diff and remove old services
trace!("advertising mdns service: {:?}", service);
match self.mdns_daemon.register(service) {
Ok(_) => {}
Ok(()) => {}
Err(err) => warn!("error registering mdns service: {}", err),
}
}

View file

@ -2,6 +2,6 @@ mod manager;
mod mdns;
mod service;
pub(crate) use manager::*;
pub(crate) use mdns::*;
pub use manager::*;
pub use mdns::*;
pub use service::*;

View file

@ -67,6 +67,7 @@ impl<TMeta: Metadata> Service<TMeta> {
})
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
@ -80,7 +81,7 @@ impl<TMeta: Metadata> Service<TMeta> {
.get_mut(&self.name)
{
let meta = meta.to_hashmap();
let did_change = services_meta.as_ref().map(|v| *v == meta).unwrap_or(false);
let did_change = services_meta.as_ref().is_some_and(|v| *v == meta);
*services_meta = Some(meta);
if did_change {
@ -250,7 +251,7 @@ pub enum ServiceEvent<TMeta> {
// Type-erased version of [ServiceEvent].
#[derive(Debug, Clone)]
pub(crate) enum ServiceEventInternal {
pub enum ServiceEventInternal {
Discovered {
identity: RemoteIdentity,
metadata: HashMap<String, String>,

View file

@ -29,7 +29,7 @@ use crate::{
// State of the manager that may infrequently change
// These are broken out so updates to them can be done in sync (With single RwLock lock)
#[derive(Debug)]
pub(crate) struct DynamicManagerState {
pub struct DynamicManagerState {
pub(crate) config: ManagerConfig,
pub(crate) ipv4_listener_id: Option<Result<ListenerId, String>>,
pub(crate) ipv4_port: Option<u16>,
@ -80,7 +80,7 @@ impl Manager {
let config2 = config.clone();
let (discovery_state, service_shutdown_rx) = DiscoveryManagerState::new();
let this = Arc::new(Self {
application_name: format!("/{}/spacetime/1.0.0", application_name),
application_name: format!("/{application_name}/spacetime/1.0.0"),
identity: keypair.to_identity(),
stream_id: AtomicU64::new(0),
state: RwLock::new(DynamicManagerState {
@ -139,7 +139,7 @@ impl Manager {
pub(crate) async fn emit(&self, event: ManagerStreamAction) {
match self.event_stream_tx.send(event).await {
Ok(_) => {}
Ok(()) => {}
Err(err) => warn!("error emitting event: {}", err),
}
}

View file

@ -168,16 +168,13 @@ impl ManagerStream {
pub async fn next(&mut self) -> Option<Event> {
// We loop polling internal services until an event comes in that needs to be sent to the parent application.
loop {
#[allow(clippy::panic)]
if self.shutdown.load(Ordering::Relaxed) {
panic!("`ManagerStream::next` called after shutdown event. This is a mistake in your application code!");
}
assert!(!self.shutdown.load(Ordering::Relaxed), "`ManagerStream::next` called after shutdown event. This is a mistake in your application code!");
if let Some(event) = self.queued_events.pop_front() {
return Some(event);
}
tokio::select! {
_ = self.discovery_manager.poll() => {
() = self.discovery_manager.poll() => {
continue;
},
event = self.event_stream_rx.recv() => {
@ -328,7 +325,7 @@ impl ManagerStream {
let v = state.connected.get(v);
if v.is_none() {
warn!("Error converting PeerId({v:?}) into RemoteIdentity. This is likely a bug in P2P.")
warn!("Error converting PeerId({v:?}) into RemoteIdentity. This is likely a bug in P2P.");
}
v.copied()
@ -339,7 +336,7 @@ impl ManagerStream {
response
.send(result)
.map_err(|_| {
error!("Error sending response to `GetConnectedPeers` request! Sending was dropped!")
error!("Error sending response to `GetConnectedPeers` request! Sending was dropped!");
})
.ok();
}
@ -365,7 +362,7 @@ impl ManagerStream {
.unwrap_or_else(PoisonError::into_inner);
state.config = config;
ManagerStream::refresh_listeners(&mut self.swarm, &mut state);
Self::refresh_listeners(&mut self.swarm, &mut state);
if !state.config.enabled {
if let Some(mdns) = self.discovery_manager.mdns.take() {
@ -380,7 +377,7 @@ impl ManagerStream {
) {
Ok(mdns) => {
self.discovery_manager.mdns = Some(mdns);
self.discovery_manager.do_advertisement()
self.discovery_manager.do_advertisement();
}
Err(err) => {
error!("error starting mDNS service: {err:?}");
@ -397,7 +394,7 @@ impl ManagerStream {
ManagerStreamAction::Shutdown(tx) => {
info!("Shutting down P2P Manager...");
self.discovery_manager.shutdown();
tx.send(()).unwrap_or_else(|_| {
tx.send(()).unwrap_or_else(|()| {
warn!("Error sending shutdown signal to P2P Manager!");
});

View file

@ -8,7 +8,7 @@ use uuid::Uuid;
pub mod decode {
use crate::spacetunnel::IdentityErr;
use super::*;
use super::{Error, Uuid};
use tokio::io::{AsyncRead, AsyncReadExt};
#[derive(Error, Debug)]
@ -52,7 +52,7 @@ pub mod decode {
}
pub mod encode {
use super::*;
use super::Uuid;
/// Serialize uuid as it's fixed size data.
pub fn uuid(buf: &mut Vec<u8>, uuid: &Uuid) {
@ -61,20 +61,16 @@ pub mod encode {
/// Serialize string as it's u16 length and data.
pub fn string(buf: &mut Vec<u8>, s: &str) {
#[allow(clippy::panic)] // TODO: Remove this panic
if s.len() > u16::MAX as usize {
panic!("String is too long!"); // TODO: Chunk this so it will never error
}
// TODO: Chunk this so it will never error
assert!(s.len() <= u16::MAX as usize, "String is too long!");
buf.extend_from_slice(&(s.len() as u16).to_le_bytes());
buf.extend(s.as_bytes());
}
/// Serialize buf as it's u16 length and data.
pub fn buf(buf: &mut Vec<u8>, b: &[u8]) {
#[allow(clippy::panic)] // TODO: Remove this panic
if b.len() > u32::MAX as usize {
panic!("Buf is too long!"); // TODO: Chunk this so it will never error
}
// TODO: Chunk this so it will never error
assert!(b.len() <= u32::MAX as usize, "Buf is too long!");
buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
buf.extend(b);
}

View file

@ -13,6 +13,7 @@ pub struct Block<'a> {
}
impl<'a> Block<'a> {
#[must_use]
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&self.offset.to_le_bytes());

View file

@ -13,21 +13,25 @@ impl BlockSize {
stream.read_u32_le().await.map(Self)
}
#[must_use]
pub fn to_bytes(&self) -> [u8; 4] {
self.0.to_le_bytes()
}
#[must_use]
pub fn from_size(size: u64) -> Self {
// TODO: Something like: https://docs.syncthing.net/specs/bep-v1.html#selection-of-block-size
Self(131072) // 128 KiB
Self(131_072) // 128 KiB
}
/// This is super dangerous as it doesn't enforce any assumptions of the protocol and is designed just for tests.
#[cfg(test)]
#[must_use]
pub fn dangerously_new(size: u32) -> Self {
Self(size)
}
#[must_use]
pub fn size(&self) -> u32 {
self.0
}

View file

@ -1,5 +1,5 @@
//! Spaceblock is a file transfer protocol that uses a block based system to transfer files.
//! This protocol is modelled after SyncThing's BEP protocol. A huge thanks to it's original authors!
//! This protocol is modelled after `SyncThing`'s BEP protocol. A huge thanks to it's original authors!
//! You can read more about it here: <https://docs.syncthing.net/specs/bep-v1.html>
#![allow(unused)] // TODO: This module is still in heavy development!
@ -56,6 +56,7 @@ impl<'a> Msg<'a> {
}
}
#[must_use]
pub fn to_bytes(&self) -> Vec<u8> {
match self {
Msg::Block(block) => {
@ -122,10 +123,12 @@ where
if read == 0 {
#[allow(clippy::panic)] // TODO: Remove panic
if (offset + read as u64) != self.reqs.requests[self.i].size {
// The file may have been modified during sender on the sender and we don't account for that.
panic!("File sending has stopped but it doesn't match the expected length!"); // TODO: Error handling + send error to remote
}
// The file may have been modified during sender on the sender and we don't account for that.
// TODO: Error handling + send error to remote
assert!(
(offset + read as u64) == self.reqs.requests[self.i].size,
"File sending has stopped but it doesn't match the expected length!"
);
return Ok(());
}
@ -207,7 +210,7 @@ where
}
stream
.write_u8(self.cancelled.load(Ordering::Relaxed) as u8)
.write_u8(u8::from(self.cancelled.load(Ordering::Relaxed)))
.await?;
stream.flush().await?;
}
@ -280,7 +283,7 @@ mod tests {
let (mut client, mut server) = tokio::io::duplex(64);
// This is sent out of band of Spaceblock
let block_size = 131072u32;
let block_size = 131_072_u32;
let data = vec![0u8; block_size as usize * 4]; // Let's pacman some RAM
let block_size = BlockSize::dangerously_new(block_size);

View file

@ -34,6 +34,7 @@ impl Range {
}
}
#[must_use]
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::new();
@ -98,6 +99,7 @@ impl SpaceblockRequests {
})
}
#[must_use]
pub fn to_bytes(&self) -> Vec<u8> {
let Self {
id,
@ -105,9 +107,10 @@ impl SpaceblockRequests {
requests,
} = self;
#[allow(clippy::panic)] // TODO: Remove this panic
if requests.len() > 255 {
panic!("Can't Spacedrop more than 255 files at once!");
}
assert!(
requests.len() <= 255,
"Can't Spacedrop more than 255 files at once!"
);
let mut buf = vec![];
encode::uuid(&mut buf, id);
@ -162,6 +165,7 @@ impl SpaceblockRequest {
})
}
#[must_use]
pub fn to_bytes(&self) -> Vec<u8> {
let Self { name, size, range } = self;
let mut buf = Vec::new();

View file

@ -30,7 +30,7 @@ pub const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100;
#[derive(Debug, Error)]
pub enum OutboundFailure {}
/// SpaceTime is a [`NetworkBehaviour`](libp2p_swarm::NetworkBehaviour) that implements the SpaceTime protocol.
/// `SpaceTime` is a [`NetworkBehaviour`](libp2p_swarm::NetworkBehaviour) that implements the `SpaceTime` protocol.
/// This protocol sits under the application to abstract many complexities of 2 way connections and deals with authentication, chucking, etc.
pub struct SpaceTime {
pub(crate) manager: Arc<Manager>,

View file

@ -93,6 +93,7 @@ impl UnicastStream {
})
}
#[must_use]
pub fn remote_identity(&self) -> RemoteIdentity {
self.remote
}

View file

@ -38,6 +38,7 @@ impl Default for Identity {
}
impl Identity {
#[must_use]
pub fn new() -> Self {
Self::default()
}
@ -50,10 +51,12 @@ impl Identity {
)))
}
#[must_use]
pub fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes().to_vec()
}
#[must_use]
pub fn to_remote_identity(&self) -> RemoteIdentity {
RemoteIdentity(self.0.verifying_key())
}
@ -156,10 +159,12 @@ impl RemoteIdentity {
)?))
}
#[must_use]
pub fn get_bytes(&self) -> [u8; REMOTE_IDENTITY_LEN] {
self.0.to_bytes()
}
#[must_use]
pub fn verifying_key(&self) -> VerifyingKey {
self.0
}

View file

@ -8,10 +8,12 @@ use crate::spacetunnel::{Identity, RemoteIdentity};
pub struct Keypair(ed25519::Keypair);
impl Keypair {
#[must_use]
pub fn generate() -> Self {
Self(ed25519::Keypair::generate())
}
#[must_use]
pub fn to_identity(&self) -> Identity {
// This depends on libp2p implementation details which isn't great
SigningKey::from_keypair_bytes(&self.0.to_bytes())
@ -19,17 +21,20 @@ impl Keypair {
.into()
}
#[must_use]
pub fn to_remote_identity(&self) -> RemoteIdentity {
self.to_identity().to_remote_identity()
}
// TODO: Make this `pub(crate)`
#[must_use]
pub fn peer_id(&self) -> libp2p::PeerId {
let pk: libp2p::identity::PublicKey = self.0.public().into();
libp2p::PeerId::from_public_key(&pk)
}
#[must_use]
pub fn inner(&self) -> libp2p::identity::Keypair {
self.0.clone().into()
}

View file

@ -4,4 +4,4 @@ mod multiaddr;
pub use keypair::*;
pub use metadata::*;
pub(crate) use multiaddr::*;
pub use multiaddr::*;

View file

@ -4,7 +4,7 @@ use libp2p::{multiaddr::Protocol, Multiaddr};
// TODO: Turn these into From/Into impls on a wrapper type
pub(crate) fn quic_multiaddr_to_socketaddr(m: Multiaddr) -> Result<SocketAddr, String> {
pub fn quic_multiaddr_to_socketaddr(m: Multiaddr) -> Result<SocketAddr, String> {
let mut addr_parts = m.iter();
let addr = match addr_parts.next() {
@ -12,8 +12,7 @@ pub(crate) fn quic_multiaddr_to_socketaddr(m: Multiaddr) -> Result<SocketAddr, S
Some(Protocol::Ip6(addr)) => IpAddr::V6(addr),
Some(proto) => {
return Err(format!(
"Invalid multiaddr. Segment 1 found protocol 'Ip4' or 'Ip6' but found '{}'",
proto
"Invalid multiaddr. Segment 1 found protocol 'Ip4' or 'Ip6' but found '{proto}'"
))
}
None => return Err("Invalid multiaddr. Segment 1 missing".to_string()),
@ -23,8 +22,7 @@ pub(crate) fn quic_multiaddr_to_socketaddr(m: Multiaddr) -> Result<SocketAddr, S
Some(Protocol::Udp(port)) => port,
Some(proto) => {
return Err(format!(
"Invalid multiaddr. Segment 2 expected protocol 'Udp' but found '{}'",
proto
"Invalid multiaddr. Segment 2 expected protocol 'Udp' but found '{proto}'"
))
}
None => return Err("Invalid multiaddr. Segment 2 missing".to_string()),
@ -33,7 +31,8 @@ pub(crate) fn quic_multiaddr_to_socketaddr(m: Multiaddr) -> Result<SocketAddr, S
Ok(SocketAddr::new(addr, port))
}
pub(crate) fn socketaddr_to_quic_multiaddr(m: &SocketAddr) -> Multiaddr {
#[must_use]
pub fn socketaddr_to_quic_multiaddr(m: &SocketAddr) -> Multiaddr {
let mut addr = Multiaddr::empty();
match m {
SocketAddr::V4(ip) => addr.push(Protocol::Ip4(*ip.ip())),

View file

@ -16,7 +16,7 @@ impl std::fmt::Display for OperationKind<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OperationKind::Create => write!(f, "c"),
OperationKind::Update(field) => write!(f, "u:{}", field),
OperationKind::Update(field) => write!(f, "u:{field}"),
OperationKind::Delete => write!(f, "d"),
}
}
@ -31,6 +31,7 @@ pub struct RelationOperation {
}
impl RelationOperation {
#[must_use]
pub fn kind(&self) -> OperationKind {
self.data.as_kind()
}
@ -64,6 +65,7 @@ pub struct SharedOperation {
}
impl SharedOperation {
#[must_use]
pub fn kind(&self) -> OperationKind {
self.data.as_kind()
}

View file

@ -2,7 +2,10 @@ use serde_json::{json, Value};
use uhlc::HLC;
use uuid::Uuid;
use crate::*;
use crate::{
CRDTOperation, CRDTOperationType, RelationOperation, RelationOperationData, RelationSyncId,
RelationSyncModel, SharedOperation, SharedOperationData, SharedSyncModel, SyncId,
};
pub trait OperationFactory {
fn get_clock(&self) -> &HLC;

View file

@ -14,10 +14,12 @@ pub fn chain_optional_iter<T>(
.collect()
}
#[must_use]
pub fn uuid_to_bytes(uuid: Uuid) -> Vec<u8> {
uuid.as_bytes().to_vec()
}
#[must_use]
pub fn from_bytes_to_uuid(bytes: &[u8]) -> Uuid {
Uuid::from_slice(bytes).expect("corrupted uuid in database")
}