Block size (#2377)

Block size + some Clippy
This commit is contained in:
Oscar Beaumont 2024-04-24 16:27:31 +08:00 committed by GitHub
parent e009a0478c
commit 643bd3a142
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 137 additions and 87 deletions

View file

@ -164,7 +164,7 @@ async fn open_trash_in_os_explorer() -> Result<(), ()> {
.wait()
.map_err(|err| error!("Error opening trash: {err:#?}"))?;
return Ok(());
Ok(())
}
#[cfg(target_os = "windows")]

View file

@ -4,7 +4,7 @@ use axum::{
extract::{FromRequestParts, State},
headers::{authorization::Basic, Authorization},
http::Request,
middleware::{self, Next},
middleware::Next,
response::{IntoResponse, Response},
routing::get,
TypedHeader,
@ -24,12 +24,13 @@ pub struct AppState {
auth: HashMap<String, SecStr>,
}
#[allow(unused)]
async fn basic_auth<B>(
State(state): State<AppState>,
request: Request<B>,
next: Next<B>,
) -> Response {
let request = if state.auth.len() != 0 {
let request = if !state.auth.is_empty() {
let (mut parts, body) = request.into_parts();
let Ok(TypedHeader(Authorization(hdr))) =
@ -46,7 +47,7 @@ async fn basic_auth<B>(
if state
.auth
.get(hdr.username())
.and_then(|pass| Some(*pass == SecStr::from(hdr.password())))
.map(|pass| *pass == SecStr::from(hdr.password()))
!= Some(true)
{
return Response::builder()
@ -110,7 +111,7 @@ async fn main() {
.into_iter()
.enumerate()
.filter_map(|(i, s)| {
if s.len() == 0 {
if s.is_empty() {
return None;
}
@ -133,7 +134,7 @@ async fn main() {
};
// We require credentials in production builds (unless explicitly disabled)
if auth.len() == 0 && !disabled {
if auth.is_empty() && !disabled {
#[cfg(not(debug_assertions))]
{
warn!("The 'SD_AUTH' environment variable is not set!");
@ -143,6 +144,7 @@ async fn main() {
}
}
#[allow(unused)]
let state = AppState { auth };
let (node, router) = match Node::new(

View file

@ -1,4 +1,4 @@
use std::{collections::HashMap, path::PathBuf, process::Command};
use std::{collections::HashMap, path::PathBuf};
use crate::{
api::{locations::ExplorerItem, utils::library},
@ -16,7 +16,7 @@ use crate::{
use opendal::{services::Fs, Operator};
use sd_cache::{CacheNode, Model, Normalise, Reference};
use sd_core_indexer_rules::seed::{no_hidden, no_os_protected};
use sd_core_indexer_rules::seed::no_hidden;
use sd_core_indexer_rules::IndexerRule;
use sd_core_prisma_helpers::{file_path_with_object, object_with_file_paths};
use sd_file_ext::kind::ObjectKind;
@ -28,7 +28,7 @@ use futures::StreamExt;
use rspc::{alpha::AlphaRouter, ErrorCode};
use serde::{Deserialize, Serialize};
use specta::Type;
use tracing::{error, info, warn};
use tracing::{error, warn};
pub mod file_path;
pub mod media_data;
@ -245,7 +245,7 @@ pub fn mount() -> AlphaRouter<Ctx> {
};
}
if to_generate.len() > 0 {
if !to_generate.is_empty() {
node.thumbnailer
.new_ephemeral_thumbnails_batch(BatchToProcess::new(
to_generate,

View file

@ -62,7 +62,7 @@ fn default_as_true() -> bool {
}
fn skip_if_true(value: &bool) -> bool {
*value == true
*value
}
#[derive(Debug, Clone, Serialize, Deserialize, Type)]

View file

@ -150,11 +150,11 @@ impl P2PManager {
info!(
"Setting quic ipv4 listener to: {:?}",
config.p2p.ipv4.then(|| port)
config.p2p.ipv4.then_some(port)
);
if let Err(err) = self
.quic
.set_ipv4_enabled(config.p2p.ipv4.then(|| port))
.set_ipv4_enabled(config.p2p.ipv4.then_some(port))
.await
{
error!("Failed to enabled quic ipv4 listener: {err}");
@ -163,16 +163,16 @@ impl P2PManager {
self.listener_errors
.lock()
.unwrap_or_else(PoisonError::into_inner)
.ipv4 = Some(format!("{err}"));
.ipv4 = Some(err.to_string());
}
info!(
"Setting quic ipv6 listener to: {:?}",
config.p2p.ipv6.then(|| port)
config.p2p.ipv6.then_some(port)
);
if let Err(err) = self
.quic
.set_ipv6_enabled(config.p2p.ipv6.then(|| port))
.set_ipv6_enabled(config.p2p.ipv6.then_some(port))
.await
{
error!("Failed to enabled quic ipv6 listener: {err}");
@ -181,7 +181,7 @@ impl P2PManager {
self.listener_errors
.lock()
.unwrap_or_else(PoisonError::into_inner)
.ipv6 = Some(format!("{err}"));
.ipv6 = Some(err.to_string());
}
let should_revert = match config.p2p_discovery {

View file

@ -85,7 +85,7 @@ pub async fn spacedrop(
debug!("({id}): connected, sending header");
let header = Header::Spacedrop(SpaceblockRequests {
id,
block_size: BlockSize::from_size(total_length),
block_size: BlockSize::from_file_size(total_length),
requests,
});
if let Err(err) = stream.write_all(&header.to_bytes()).await {

View file

@ -1,39 +1,97 @@
#![allow(non_upper_case_globals)]
use std::io;
use tokio::io::{AsyncRead, AsyncReadExt};
/// TODO
const KiB: u32 = 1024;
const MiB: u32 = 1024 * KiB;
const GiB: u32 = 1024 * MiB;
/// defines the size of each chunk of data that is sent
///
/// We store this in an enum so it's super efficient.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockSize(u32); // Max block size is gonna be 3.9GB which is stupidly overkill
pub enum BlockSize {
_128KiB,
_256KiB,
_512KiB,
_1MiB,
_2MiB,
_4MiB,
_8MiB,
_16MiB,
}
impl BlockSize {
// TODO: Validating `BlockSize` are multiple of 2, i think. Idk why but BEP does it.
pub async fn from_stream(stream: &mut (impl AsyncRead + Unpin)) -> io::Result<Self> {
stream.read_u32_le().await.map(Self)
}
/// Determine the optimal block size for a given file size
#[must_use]
pub fn to_bytes(&self) -> [u8; 4] {
self.0.to_le_bytes()
}
#[must_use]
pub fn from_size(size: u64) -> Self {
// TODO: Something like: https://docs.syncthing.net/specs/bep-v1.html#selection-of-block-size
Self(131_072) // 128 KiB
}
/// This is super dangerous as it doesn't enforce any assumptions of the protocol and is designed just for tests.
#[cfg(test)]
#[must_use]
pub fn dangerously_new(size: u32) -> Self {
Self(size)
pub fn from_file_size(size: u64) -> Self {
// Values directly copied from https://docs.syncthing.net/specs/bep-v1.html#selection-of-block-size
if size < 250 * u64::from(MiB) {
return Self::_128KiB;
} else if size < 500 * u64::from(MiB) {
return Self::_256KiB;
} else if size < u64::from(GiB) {
return Self::_512KiB;
} else if size < 2 * u64::from(GiB) {
return Self::_1MiB;
} else if size < 4 * u64::from(GiB) {
return Self::_2MiB;
} else if size < 8 * u64::from(GiB) {
return Self::_4MiB;
} else if size < 16 * u64::from(GiB) {
return Self::_8MiB;
}
Self::_16MiB
}
/// Get the size of the block in bytes
#[must_use]
pub fn size(&self) -> u32 {
self.0
match self {
Self::_128KiB => 128 * KiB,
Self::_256KiB => 256 * KiB,
Self::_512KiB => 512 * KiB,
Self::_1MiB => MiB,
Self::_2MiB => 2 * MiB,
Self::_4MiB => 4 * MiB,
Self::_8MiB => 8 * MiB,
Self::_16MiB => 16 * MiB,
}
}
pub async fn from_stream(stream: &mut (impl AsyncRead + Unpin)) -> io::Result<Self> {
// WARNING: Be careful modifying this cause it may break backwards/forwards-compatibility
match stream.read_u8().await? {
0 => Ok(Self::_128KiB),
1 => Ok(Self::_256KiB),
2 => Ok(Self::_512KiB),
3 => Ok(Self::_1MiB),
4 => Ok(Self::_2MiB),
5 => Ok(Self::_4MiB),
6 => Ok(Self::_8MiB),
7 => Ok(Self::_16MiB),
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid block size",
)),
}
}
#[must_use]
pub fn to_bytes(&self) -> [u8; 1] {
// WARNING: Be careful modifying this cause it may break backwards/forwards-compatibility
[match self {
Self::_128KiB => 0,
Self::_256KiB => 1,
Self::_512KiB => 2,
Self::_1MiB => 3,
Self::_2MiB => 4,
Self::_4MiB => 5,
Self::_8MiB => 6,
Self::_16MiB => 7,
}]
}
}
@ -45,7 +103,14 @@ mod tests {
#[tokio::test]
async fn test_block_size() {
let req = BlockSize::dangerously_new(5);
let req = BlockSize::_128KiB;
let bytes = req.to_bytes();
let req2 = BlockSize::from_stream(&mut Cursor::new(bytes))
.await
.unwrap();
assert_eq!(req, req2);
let req = BlockSize::_16MiB;
let bytes = req.to_bytes();
let req2 = BlockSize::from_stream(&mut Cursor::new(bytes))
.await

View file

@ -1,32 +1,22 @@
//! TODO
// TODO: Clippy lints here
//! Spaceblock is a file transfer protocol that uses a block based system to transfer files.
//! This protocol is modelled after `SyncThing`'s BEP protocol. A huge thanks to it's original authors!
//! A protocol for efficiently and securely transferring files between peers.
//!
//! Goals:
//! - Fast - Transfer files as quickly as possible
//! - Safe - Verify the files integrity on both ends
//!
//! This protocol was heavily inspired by SyncThing's Block Exchange Protocol protocol although it's not compatible.
//! You can read more about it here: <https://docs.syncthing.net/specs/bep-v1.html>
#![allow(unused)] // TODO: This module is still in heavy development!
//!
#![warn(clippy::unwrap_used, clippy::panic)]
use std::{
io,
marker::PhantomData,
path::{Path, PathBuf},
string::FromUtf8Error,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
sync::atomic::{AtomicBool, Ordering},
};
use thiserror::Error;
use tokio::{
fs::File,
io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader},
};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tracing::debug;
use sd_p2p::UnicastStream;
use sd_p2p_proto::{decode, encode};
mod block;
mod block_size;
mod sb_request;
@ -123,9 +113,8 @@ where
); // SAFETY: Percent must be between 0 and 100
if read == 0 {
#[allow(clippy::panic)] // TODO: Remove panic
// The file may have been modified during sender on the sender and we don't account for that.
// TODO: Error handling + send error to remote
// The file may have been modified during sender on the sender and we don't account for that.
// TODO: Error handling + send error to remote
assert!(
(offset + read as u64) == self.reqs.requests[self.i].size,
"File sending has stopped but it doesn't match the expected length!"
@ -236,9 +225,9 @@ where
#[cfg(test)]
mod tests {
use std::{io::Cursor, mem};
use std::{io::Cursor, mem, sync::Arc};
use tokio::sync::oneshot;
use tokio::{io::BufReader, sync::oneshot};
use uuid::Uuid;
use super::*;
@ -251,7 +240,7 @@ mod tests {
let data = b"Spacedrive".to_vec();
let req = SpaceblockRequests {
id: Uuid::new_v4(),
block_size: BlockSize::from_size(data.len() as u64),
block_size: BlockSize::from_file_size(data.len() as u64),
requests: vec![SpaceblockRequest {
name: "Demo".to_string(),
size: data.len() as u64,
@ -287,9 +276,8 @@ mod tests {
let (mut client, mut server) = tokio::io::duplex(64);
// This is sent out of band of Spaceblock
let block_size = 131_072_u32;
let data = vec![0u8; block_size as usize * 4]; // Let's pacman some RAM
let block_size = BlockSize::dangerously_new(block_size);
let block_size = BlockSize::_128KiB;
let data = vec![0u8; block_size.size() as usize * 4]; // Let's pacman some RAM
let req = SpaceblockRequests {
id: Uuid::new_v4(),
@ -328,9 +316,8 @@ mod tests {
let (mut client, mut server) = tokio::io::duplex(64);
// This is sent out of band of Spaceblock
let block_size = 25u32;
let data = vec![0u8; block_size as usize];
let block_size = BlockSize::dangerously_new(block_size); // TODO: Determine it using proper algo instead of hardcoding it
let block_size = BlockSize::_128KiB;
let data = vec![0u8; block_size.size() as usize];
let req = SpaceblockRequests {
id: Uuid::new_v4(),
@ -370,9 +357,8 @@ mod tests {
let (mut client, mut server) = tokio::io::duplex(64);
// This is sent out of band of Spaceblock
let block_size = 25u32;
let data = vec![0u8; block_size as usize];
let block_size = BlockSize::dangerously_new(block_size); // TODO: Determine it using proper algo instead of hardcoding it
let block_size = BlockSize::_128KiB;
let data = vec![0u8; block_size.size() as usize];
let req = SpaceblockRequests {
id: Uuid::new_v4(),
@ -413,9 +399,8 @@ mod tests {
let (mut client, mut server) = tokio::io::duplex(64);
// This is sent out of band of Spaceblock
let block_size = 25u32;
let block_size = BlockSize::_128KiB;
let data = vec![0u8; 0]; // Zero sized file
let block_size = BlockSize::dangerously_new(block_size); // TODO: Determine it using proper algo instead of hardcoding it
let req = SpaceblockRequests {
id: Uuid::new_v4(),

View file

@ -88,7 +88,7 @@ impl SpaceblockRequests {
.map_err(SpaceblockRequestsError::InvalidLen)?;
let mut requests = Vec::new();
for i in 0..size {
for _i in 0..size {
requests.push(SpaceblockRequest::from_stream(stream).await?);
}
@ -106,7 +106,6 @@ impl SpaceblockRequests {
block_size,
requests,
} = self;
#[allow(clippy::panic)] // TODO: Remove this panic
assert!(
requests.len() <= 255,
"Can't Spacedrop more than 255 files at once!"
@ -167,10 +166,9 @@ impl SpaceblockRequest {
#[must_use]
pub fn to_bytes(&self) -> Vec<u8> {
let Self { name, size, range } = self;
let mut buf = Vec::new();
encode::string(&mut buf, name);
encode::string(&mut buf, &self.name);
buf.extend_from_slice(&self.size.to_le_bytes());
buf.extend_from_slice(&self.range.to_bytes());
buf
@ -215,7 +213,7 @@ mod tests {
async fn test_spaceblock_requests_one() {
let req = SpaceblockRequests {
id: Uuid::new_v4(),
block_size: BlockSize::from_size(42069),
block_size: BlockSize::from_file_size(42069),
requests: vec![SpaceblockRequest {
name: "Demo".to_string(),
size: 42069,
@ -246,7 +244,7 @@ mod tests {
async fn test_spaceblock_requests_many() {
let req = SpaceblockRequests {
id: Uuid::new_v4(),
block_size: BlockSize::from_size(42069),
block_size: BlockSize::from_file_size(42069),
requests: vec![
SpaceblockRequest {
name: "Demo".to_string(),