[ENG-1310, ENG-1300] Spacedrop better testing + fix zero-sized files (#1606)

* unit tests save lives

* Support for zero-sized files

* fix: clippy

---------

Co-authored-by: jake <77554505+brxken128@users.noreply.github.com>
This commit is contained in:
Oscar Beaumont 2023-10-17 17:12:38 +11:00 committed by GitHub
parent a7ad24c641
commit 1b856848e8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 398 additions and 126 deletions

View file

@ -234,7 +234,7 @@ impl P2PManager {
debug!("Received ping from peer '{}'", event.peer_id);
}
Header::Spacedrop(req) => {
let id = Uuid::new_v4(); // TODO: Get ID from the remote
let id = req.id;
let (tx, rx) = oneshot::channel();
info!(
@ -254,7 +254,6 @@ impl P2PManager {
.find(|p| p.peer_id == event.peer_id)
.map(|p| p.metadata.name)
.unwrap_or_else(|| "Unknown".to_string()),
// TODO: If multiple files in request ask user to select a whole directory instead!
files: req
.requests
.iter()
@ -270,7 +269,7 @@ impl P2PManager {
tokio::select! {
_ = sleep(SPACEDROP_TIMEOUT) => {
info!("spacedrop({id}): timeout, rejecting!");
info!("({id}): timeout, rejecting!");
stream.write_all(&[0]).await.unwrap();
stream.flush().await.unwrap();

View file

@ -43,3 +43,4 @@ hex = "0.4.3"
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
uuid = { version = "1.4.1", features = ["v4"] }

View file

@ -14,6 +14,7 @@ impl<'a> Block<'a> {
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&self.offset.to_le_bytes());
debug_assert_eq!(self.data.len(), self.size as usize); // TODO: Should `self.size` be inferred instead?
buf.extend_from_slice(&self.size.to_le_bytes());
buf.extend_from_slice(self.data);
buf
@ -33,6 +34,10 @@ impl<'a> Block<'a> {
// TODO: Ensure `size` is `block_size` or smaller else buffer overflow
if size as usize > data_buf.len() {
return Err(());
}
stream
.read_exact(&mut data_buf[..size as usize])
.await
@ -46,4 +51,46 @@ impl<'a> Block<'a> {
}
}
// TODO: Unit test `Block`
#[cfg(test)]
mod tests {
use std::io::Cursor;
use crate::spaceblock::BlockSize;
use super::*;
#[tokio::test]
async fn test_block() {
let mut req = Block {
offset: 420,
size: 10, // Matches length of string on next line
data: b"Spacedrive".as_ref(),
};
let bytes = req.to_bytes();
let mut data2 = vec![0; req.data.len()];
let req2 = Block::from_stream(&mut Cursor::new(bytes), &mut data2)
.await
.unwrap();
let data = std::mem::take(&mut req.data);
assert_eq!(req, req2);
assert_eq!(data, data2);
}
#[tokio::test]
#[should_panic] // TODO: This currently panics but long term it should have proper error handling
async fn test_block_data_buf_overflow() {
let mut req = Block {
offset: 420,
size: 10, // Matches length of string on next line
data: b"Spacedrive".as_ref(),
};
let bytes = req.to_bytes();
let mut data2 = vec![0; 5]; // Length smaller than `req.data.len()`
let req2 = Block::from_stream(&mut Cursor::new(bytes), &mut data2)
.await
.unwrap();
let data = std::mem::take(&mut req.data);
assert_eq!(req, req2);
assert_eq!(data, data2);
}
}

View file

@ -32,3 +32,20 @@ impl BlockSize {
self.0
}
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use super::*;
#[tokio::test]
async fn test_block_size() {
let req = BlockSize::dangerously_new(5);
let bytes = req.to_bytes();
let req2 = BlockSize::from_stream(&mut Cursor::new(bytes))
.await
.unwrap();
assert_eq!(req, req2);
}
}

View file

@ -121,13 +121,11 @@ where
); // SAFETY: Percent must be between 0 and 100
if read == 0 {
// TODO: Properly handle zero-sized files
if (offset + read as u64) != self.reqs.requests[self.i].size {
panic!("U dun goofed"); // TODO: Error handling
// The file may have been modified during sender on the sender and we don't account for that.
panic!("File sending has stopped but it doesn't match the expected length!"); // TODO: Error handling + send error to remote
}
// TODO: Should indicate to the other side it's done???
return;
}
@ -174,8 +172,19 @@ where
let mut data_buf = vec![0u8; self.reqs.block_size.size() as usize];
let mut offset: u64 = 0;
if self.reqs.requests[self.i].size == 0 {
self.i += 1;
return;
}
// TODO: Prevent loop being a DOS vector
loop {
if self.cancelled.load(Ordering::Relaxed) {
stream.write_u8(1).await.unwrap(); // TODO: Error handling
stream.flush().await.unwrap(); // TODO: Error handling
return;
}
// TODO: Timeout if nothing is being received
let msg = Msg::from_stream(stream, &mut data_buf).await.unwrap(); // TODO: Error handling
match msg {
@ -221,140 +230,248 @@ where
}
}
// #[cfg(test)]
// mod tests {
// use std::io::Cursor;
#[cfg(test)]
mod tests {
use std::{io::Cursor, mem};
// use tokio::sync::oneshot;
use tokio::sync::oneshot;
use uuid::Uuid;
// use super::*;
use super::*;
// #[tokio::test]
// async fn test_spaceblock_request() {
// let req = SpaceblockRequest {
// name: "Demo".to_string(),
// size: 42069,
// block_size: BlockSize::from_size(42069),
// range: Range::Full,
// };
#[tokio::test]
async fn test_spaceblock_single_block() {
let (mut client, mut server) = tokio::io::duplex(64);
// let bytes = req.to_bytes();
// let req2 = SpaceblockRequest::from_stream(&mut Cursor::new(bytes))
// .await
// .unwrap();
// assert_eq!(req, req2);
// This is sent out of band of Spaceblock
let data = b"Spacedrive".to_vec();
let req = SpaceblockRequests {
id: Uuid::new_v4(),
block_size: BlockSize::from_size(data.len() as u64),
requests: vec![SpaceblockRequest {
name: "Demo".to_string(),
size: data.len() as u64,
range: Range::Full,
}],
};
// let req = SpaceblockRequest {
// name: "Demo".to_string(),
// size: 42069,
// block_size: BlockSize::from_size(42069),
// range: Range::Partial(0..420),
// };
let (tx, rx) = oneshot::channel();
tokio::spawn({
let req = req.clone();
let data = data.clone();
async move {
let file = BufReader::new(Cursor::new(data));
tx.send(()).unwrap();
Transfer::new(&req, |_| {}, &Default::default())
.send(&mut client, file)
.await;
}
});
// let bytes = req.to_bytes();
// let req2 = SpaceblockRequest::from_stream(&mut Cursor::new(bytes))
// .await
// .unwrap();
// assert_eq!(req, req2);
// }
rx.await.unwrap();
// #[tokio::test]
// async fn test_spaceblock_single_block() {
// let (mut client, mut server) = tokio::io::duplex(64);
let mut result = Vec::new();
Transfer::new(&req, |_| {}, &Default::default())
.receive(&mut server, &mut result)
.await;
assert_eq!(result, data);
}
// // This is sent out of band of Spaceblock
// let data = b"Spacedrive".to_vec();
// let req = SpaceblockRequest {
// name: "Demo".to_string(),
// size: data.len() as u64,
// block_size: BlockSize::from_size(data.len() as u64),
// range: Range::Full,
// };
// https://github.com/spacedriveapp/spacedrive/pull/942
#[tokio::test]
async fn test_spaceblock_multiple_blocks() {
let (mut client, mut server) = tokio::io::duplex(64);
// let (tx, rx) = oneshot::channel();
// tokio::spawn({
// let req = req.clone();
// let data = data.clone();
// async move {
// let file = BufReader::new(Cursor::new(data));
// tx.send(()).unwrap();
// Transfer::new(&req, |_| {}, &Default::default())
// .send(&mut client, file)
// .await;
// }
// });
// This is sent out of band of Spaceblock
let block_size = 131072u32;
let data = vec![0u8; block_size as usize * 4]; // Let's pacman some RAM
let block_size = BlockSize::dangerously_new(block_size);
// rx.await.unwrap();
let req = SpaceblockRequests {
id: Uuid::new_v4(),
block_size,
requests: vec![SpaceblockRequest {
name: "Demo".to_string(),
size: data.len() as u64,
range: Range::Full,
}],
};
// let mut result = Vec::new();
// Transfer::new(&req, |_| {}, &Default::default())
// .receive(&mut server, &mut result)
// .await;
// assert_eq!(result, data);
// }
let (tx, rx) = oneshot::channel();
tokio::spawn({
let req = req.clone();
let data = data.clone();
async move {
let file = BufReader::new(Cursor::new(data));
tx.send(()).unwrap();
Transfer::new(&req, |_| {}, &Default::default())
.send(&mut client, file)
.await;
}
});
// // https://github.com/spacedriveapp/spacedrive/pull/942
// #[tokio::test]
// async fn test_spaceblock_multiple_blocks() {
// let (mut client, mut server) = tokio::io::duplex(64);
rx.await.unwrap();
// // This is sent out of band of Spaceblock
// let block_size = 131072u32;
// let data = vec![0u8; block_size as usize * 4]; // Let's pacman some RAM
// let block_size = BlockSize::dangerously_new(block_size);
let mut result = Vec::new();
Transfer::new(&req, |_| {}, &Default::default())
.receive(&mut server, &mut result)
.await;
assert_eq!(result, data);
}
// let req = SpaceblockRequest {
// name: "Demo".to_string(),
// size: data.len() as u64,
// block_size,
// range: Range::Full,
// };
#[tokio::test]
async fn test_transfer_receiver_cancelled() {
let (mut client, mut server) = tokio::io::duplex(64);
// let (tx, rx) = oneshot::channel();
// tokio::spawn({
// let req = req.clone();
// let data = data.clone();
// async move {
// let file = BufReader::new(Cursor::new(data));
// tx.send(()).unwrap();
// Transfer::new(&req, |_| {}, &Default::default())
// .send(&mut client, file)
// .await;
// }
// });
// This is sent out of band of Spaceblock
let block_size = 25u32;
let data = vec![0u8; block_size as usize];
let block_size = BlockSize::dangerously_new(block_size); // TODO: Determine it using proper algo instead of harcoding it
// rx.await.unwrap();
let req = SpaceblockRequests {
id: Uuid::new_v4(),
block_size,
requests: vec![SpaceblockRequest {
name: "Demo".to_string(),
size: data.len() as u64,
range: Range::Full,
}],
};
// let mut result = Vec::new();
// Transfer::new(&req, |_| {}, &Default::default())
// .receive(&mut server, &mut result)
// .await;
// assert_eq!(result, data);
// }
let (tx, rx) = oneshot::channel();
tokio::spawn({
let req = req.clone();
let data = data.clone();
async move {
let file = BufReader::new(Cursor::new(data));
tx.send(()).unwrap();
// // TODO: Unit test the condition when the receiver sets the `cancelled` flag
Transfer::new(&req, |_| {}, &Arc::new(AtomicBool::new(true)))
.send(&mut client, file)
.await;
}
});
// // TODO: Unit test the condition when the sender sets the `cancelled` flag
rx.await.unwrap();
// #[tokio::test]
// async fn test_msg() {
// let block = Block {
// offset: 0,
// size: 10,
// data: b"Spacedrive".as_ref(),
// };
// let msg = Msg::Block(block);
// let bytes = msg.to_bytes();
// let msg2 = Msg::from_stream(&mut Cursor::new(bytes), &mut [0u8; 64])
// .await
// .unwrap();
// assert_eq!(msg, msg2);
let mut result = Vec::new();
Transfer::new(&req, |_| {}, &Default::default())
.receive(&mut server, &mut result)
.await;
assert_eq!(result, Vec::<u8>::new()); // Cancelled by sender so no data
}
// let msg = Msg::Cancelled;
// let bytes = msg.to_bytes();
// let msg2 = Msg::from_stream(&mut Cursor::new(bytes), &mut [0u8; 64])
// .await
// .unwrap();
// assert_eq!(msg, msg2);
// }
// }
#[tokio::test]
async fn test_transfer_sender_cancelled() {
let (mut client, mut server) = tokio::io::duplex(64);
// This is sent out of band of Spaceblock
let block_size = 25u32;
let data = vec![0u8; block_size as usize];
let block_size = BlockSize::dangerously_new(block_size); // TODO: Determine it using proper algo instead of harcoding it
let req = SpaceblockRequests {
id: Uuid::new_v4(),
block_size,
requests: vec![SpaceblockRequest {
name: "Demo".to_string(),
size: data.len() as u64,
range: Range::Full,
}],
};
let (tx, rx) = oneshot::channel();
tokio::spawn({
let req = req.clone();
let data = data.clone();
async move {
let file = BufReader::new(Cursor::new(data));
tx.send(()).unwrap();
Transfer::new(&req, |_| {}, &Default::default())
.send(&mut client, file)
.await;
}
});
rx.await.unwrap();
let mut result = Vec::new();
Transfer::new(&req, |_| {}, &Arc::new(AtomicBool::new(true)))
.receive(&mut server, &mut result)
.await;
assert_eq!(result, Vec::<u8>::new()); // Cancelled by sender so no data
}
// https://linear.app/spacedriveapp/issue/ENG-1300/spaceblock-doesnt-like-zero-sized-files
#[tokio::test]
async fn test_spaceblock_zero_sized_file() {
let (mut client, mut server) = tokio::io::duplex(64);
// This is sent out of band of Spaceblock
let block_size = 25u32;
let data = vec![0u8; 0]; // Zero sized file
let block_size = BlockSize::dangerously_new(block_size); // TODO: Determine it using proper algo instead of harcoding it
let req = SpaceblockRequests {
id: Uuid::new_v4(),
block_size,
requests: vec![SpaceblockRequest {
name: "Demo".to_string(),
size: data.len() as u64,
range: Range::Full,
}],
};
let (tx, rx) = oneshot::channel();
tokio::spawn({
let req = req.clone();
let data = data.clone();
async move {
let file = BufReader::new(Cursor::new(data));
tx.send(()).unwrap();
Transfer::new(&req, |_| {}, &Default::default())
.send(&mut client, file)
.await;
}
});
rx.await.unwrap();
let mut result = Vec::new();
Transfer::new(&req, |_| {}, &Default::default())
.receive(&mut server, &mut result)
.await;
assert_eq!(result, Vec::<u8>::new()); // Cancelled by sender so no data
}
#[tokio::test]
async fn test_msg() {
let block = Block {
offset: 0,
size: 10,
data: b"Spacedrive".as_ref(),
};
let data_len = block.data.len();
let mut msg = Msg::Block(block);
let bytes = msg.to_bytes();
let mut data2 = vec![0; data_len];
let msg2 = Msg::from_stream(&mut Cursor::new(bytes), &mut data2)
.await
.unwrap();
let data = mem::take(match &mut msg {
Msg::Block(block) => &mut block.data,
_ => unreachable!(),
}); // We decode the data into
assert_eq!(msg, msg2);
assert_eq!(data, data2);
let msg = Msg::Cancelled;
let bytes = msg.to_bytes();
let msg2 = Msg::from_stream(&mut Cursor::new(bytes), &mut [0u8; 64])
.await
.unwrap();
assert_eq!(msg, msg2);
}
}

View file

@ -103,8 +103,9 @@ impl SpaceblockRequests {
panic!("Can't Spacedrop more than 255 files at once!");
}
let mut buf = block_size.to_bytes().to_vec();
let mut buf = vec![];
encode::uuid(&mut buf, id);
buf.append(&mut block_size.to_bytes().to_vec());
buf.push(requests.len() as u8);
for request in requests {
buf.extend_from_slice(&request.to_bytes());
@ -161,4 +162,94 @@ impl SpaceblockRequest {
}
}
// TODO: This file is missing protocol unit tests
#[cfg(test)]
mod tests {
use std::io::Cursor;
use super::*;
#[tokio::test]
async fn test_range() {
let req = Range::Full;
let bytes = req.to_bytes();
let req2 = Range::from_stream(&mut Cursor::new(bytes)).await.unwrap();
assert_eq!(req, req2);
let req = Range::Partial(0..10);
let bytes = req.to_bytes();
let req2 = Range::from_stream(&mut Cursor::new(bytes)).await.unwrap();
assert_eq!(req, req2);
}
#[tokio::test]
async fn test_spaceblock_requests_empty() {
let req = SpaceblockRequests {
id: Uuid::new_v4(),
block_size: BlockSize::from_size(42069),
requests: vec![],
};
let bytes = req.to_bytes();
let req2 = SpaceblockRequests::from_stream(&mut Cursor::new(bytes))
.await
.unwrap();
assert_eq!(req, req2);
}
#[tokio::test]
async fn test_spaceblock_requests_one() {
let req = SpaceblockRequests {
id: Uuid::new_v4(),
block_size: BlockSize::from_size(42069),
requests: vec![SpaceblockRequest {
name: "Demo".to_string(),
size: 42069,
range: Range::Full,
}],
};
let bytes = req.to_bytes();
let req2 = SpaceblockRequests::from_stream(&mut Cursor::new(bytes))
.await
.unwrap();
assert_eq!(req, req2);
let req = SpaceblockRequest {
name: "Demo".to_string(),
size: 42069,
range: Range::Partial(0..420),
};
let bytes = req.to_bytes();
let req2 = SpaceblockRequest::from_stream(&mut Cursor::new(bytes))
.await
.unwrap();
assert_eq!(req, req2);
}
#[tokio::test]
async fn test_spaceblock_requests_many() {
let req = SpaceblockRequests {
id: Uuid::new_v4(),
block_size: BlockSize::from_size(42069),
requests: vec![
SpaceblockRequest {
name: "Demo".to_string(),
size: 42069,
range: Range::Full,
},
SpaceblockRequest {
name: "Demo2".to_string(),
size: 420,
range: Range::Full,
},
],
};
let bytes = req.to_bytes();
let req2 = SpaceblockRequests::from_stream(&mut Cursor::new(bytes))
.await
.unwrap();
assert_eq!(req, req2);
}
}