[ENG-568] Spacedrop Progress (#956)

* `Transfer` struct

* progress moment

* fixes
This commit is contained in:
Oscar Beaumont 2023-06-18 10:40:51 +02:00 committed by GitHub
parent 2d6228115c
commit fd1afd8500
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 135 additions and 74 deletions

1
Cargo.lock generated
View file

@ -6939,6 +6939,7 @@ dependencies = [
"tempfile",
"thiserror",
"tokio",
"tokio-stream",
"tracing 0.2.0",
"tracing-appender",
"tracing-subscriber 0.3.0",

View file

@ -91,6 +91,7 @@ strum_macros = "0.24"
regex = "1.8.4"
hex = "0.4.3"
int-enum = "0.5.0"
tokio-stream = "0.1.14"
[target.'cfg(windows)'.dependencies.winapi-util]
version = "0.1.5"

View file

@ -62,13 +62,16 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.procedure("acceptSpacedrop", {
R.mutation(|ctx, (id, path): (Uuid, Option<String>)| async move {
match path {
Some(path) => {
ctx.p2p.accept_spacedrop(id, path).await;
}
None => {
ctx.p2p.reject_spacedrop(id).await;
}
Some(path) => ctx.p2p.accept_spacedrop(id, path).await,
None => ctx.p2p.reject_spacedrop(id).await,
}
})
})
.procedure("spacedropProgress", {
R.subscription(|ctx, id: Uuid| async move {
ctx.p2p.spacedrop_progress(id).await.ok_or_else(|| {
rspc::Error::new(ErrorCode::BadRequest, "Spacedrop not found!".into())
})
})
})
}

View file

@ -8,8 +8,9 @@ use std::{
time::{Duration, Instant},
};
use futures::Stream;
use sd_p2p::{
spaceblock::{self, BlockSize, SpacedropRequest},
spaceblock::{BlockSize, SpacedropRequest, Transfer},
spacetime::SpaceTimeStream,
Event, Manager, ManagerError, MetadataManager, PeerId,
};
@ -56,6 +57,7 @@ pub struct P2PManager {
pub manager: Arc<Manager<PeerMetadata>>,
spacedrop_pairing_reqs: Arc<Mutex<HashMap<Uuid, oneshot::Sender<Option<String>>>>>,
pub metadata_manager: Arc<MetadataManager<PeerMetadata>>,
pub spacedrop_progress: Arc<Mutex<HashMap<Uuid, broadcast::Sender<u8>>>>,
}
impl P2PManager {
@ -82,10 +84,12 @@ impl P2PManager {
let (tx2, rx2) = broadcast::channel(100);
let spacedrop_pairing_reqs = Arc::new(Mutex::new(HashMap::new()));
let spacedrop_progress = Arc::new(Mutex::new(HashMap::new()));
tokio::spawn({
let events = tx.clone();
// let sync_events = tx2.clone();
let spacedrop_pairing_reqs = spacedrop_pairing_reqs.clone();
let spacedrop_progress = spacedrop_progress.clone();
async move {
let mut shutdown = false;
@ -113,6 +117,7 @@ impl P2PManager {
let events = events.clone();
let sync_events = tx2.clone();
let spacedrop_pairing_reqs = spacedrop_pairing_reqs.clone();
let spacedrop_progress = spacedrop_progress.clone();
tokio::spawn(async move {
let header = Header::from_stream(&mut event.stream).await.unwrap();
@ -137,6 +142,12 @@ impl P2PManager {
spacedrop_pairing_reqs.lock().await.insert(id, tx);
let (process_tx, _) = broadcast::channel(100);
spacedrop_progress
.lock()
.await
.insert(id, process_tx.clone());
if let Err(_) = events.send(P2PEvent::SpacedropRequest {
id,
peer_id: event.peer_id,
@ -162,7 +173,9 @@ impl P2PManager {
let f = File::create(file_path).await.unwrap();
spaceblock::receive(&mut stream, f, &req).await;
Transfer::new(&req, |percent| {
process_tx.send(percent).ok();
}).receive(&mut stream, f).await;
info!("spacedrop({id}): complete");
}
@ -217,6 +230,7 @@ impl P2PManager {
manager,
spacedrop_pairing_reqs,
metadata_manager,
spacedrop_progress,
});
// TODO: Probs remove this once connection timeout/keepalive are working correctly
@ -286,7 +300,13 @@ impl P2PManager {
}
// TODO: Proper error handling
pub async fn big_bad_spacedrop(&self, peer_id: PeerId, path: PathBuf) -> Result<(), ()> {
pub async fn big_bad_spacedrop(
&self,
peer_id: PeerId,
path: PathBuf,
) -> Result<Option<Uuid>, ()> {
let id = Uuid::new_v4();
let (tx, _) = broadcast::channel(25);
let mut stream = self.manager.stream(peer_id).await.map_err(|_| ())?; // TODO: handle providing incorrect peer id
let file = File::open(&path).await.map_err(|_| ())?;
@ -309,21 +329,24 @@ impl P2PManager {
stream.read_exact(&mut buf).await.map_err(|_| ())?;
if buf[0] != 1 {
debug!("Spacedrop was rejected from peer '{peer_id}'");
return Ok(());
return Ok(None);
}
debug!("Starting Spacedrop to peer '{peer_id}'");
let i = Instant::now();
let file = BufReader::new(file);
spaceblock::send(
&mut stream,
file,
self.spacedrop_progress.lock().await.insert(id, tx.clone());
Transfer::new(
&match header {
Header::Spacedrop(req) => req,
_ => unreachable!(),
},
|percent| {
tx.send(percent).ok();
},
)
.send(&mut stream, file)
.await;
debug!(
@ -331,7 +354,18 @@ impl P2PManager {
i.elapsed()
);
Ok(())
Ok(Some(id))
}
pub async fn spacedrop_progress(&self, id: Uuid) -> Option<impl Stream<Item = u8>> {
self.spacedrop_progress.lock().await.get(&id).map(|v| {
let mut v = v.subscribe();
async_stream::stream! {
while let Ok(item) = v.recv().await {
yield item;
}
}
})
}
pub async fn shutdown(&self) {

View file

@ -151,66 +151,83 @@ impl<'a> Block<'a> {
}
}
pub async fn send(
stream: &mut (impl AsyncWrite + Unpin),
mut file: (impl AsyncBufRead + Unpin),
req: &SpacedropRequest,
) {
// We manually implement what is basically a `BufReader` so we have more control
let mut buf = vec![0u8; req.block_size.size() as usize];
let mut offset: u64 = 0;
loop {
let read = file.read(&mut buf[..]).await.unwrap(); // TODO: Error handling
offset += read as u64;
if read == 0 {
if offset != req.size {
panic!("U dun goofed"); // TODO: Error handling
}
break;
}
let block = Block {
offset,
size: read as u64,
data: &buf[..read],
};
debug!(
"Sending block at offset {} of size {}",
block.offset, block.size
);
stream.write_all(&block.to_bytes()).await.unwrap(); // TODO: Error handling
}
/// TODO
pub struct Transfer<'a, F> {
req: &'a SpacedropRequest,
on_progress: F,
}
pub async fn receive(
stream: &mut (impl AsyncReadExt + Unpin),
mut file: (impl AsyncWrite + Unpin),
req: &SpacedropRequest,
) {
// We manually implement what is basically a `BufReader` so we have more control
let mut data_buf = vec![0u8; req.block_size.size() as usize];
let mut offset: u64 = 0;
impl<'a, F> Transfer<'a, F>
where
F: Fn(u8) + 'a,
{
pub fn new(req: &'a SpacedropRequest, on_progress: F) -> Self {
Self { req, on_progress }
}
// TODO: Prevent loop being a DOS vector
loop {
// TODO: Timeout if nothing is being received
let block = Block::from_stream(stream, &mut data_buf).await.unwrap(); // TODO: Error handling
offset += block.size;
pub async fn send(
&self,
stream: &mut (impl AsyncWrite + Unpin),
mut file: (impl AsyncBufRead + Unpin),
) {
// We manually implement what is basically a `BufReader` so we have more control
let mut buf = vec![0u8; self.req.block_size.size() as usize];
let mut offset: u64 = 0;
debug!(
"Received block at offset {} of size {}",
block.offset, block.size
);
file.write_all(&data_buf[..block.size as usize])
.await
.unwrap(); // TODO: Error handling
loop {
let read = file.read(&mut buf[..]).await.unwrap(); // TODO: Error handling
offset += read as u64;
(self.on_progress)(((self.req.size / offset) * 100) as u8); // SAFETY: Percent must be between 0 and 100
// TODO: Should this be `read == 0`
if offset == req.size {
break;
if read == 0 {
if offset != self.req.size {
panic!("U dun goofed"); // TODO: Error handling
}
break;
}
let block = Block {
offset,
size: read as u64,
data: &buf[..read],
};
debug!(
"Sending block at offset {} of size {}",
block.offset, block.size
);
stream.write_all(&block.to_bytes()).await.unwrap(); // TODO: Error handling
}
}
pub async fn receive(
&self,
stream: &mut (impl AsyncReadExt + Unpin),
mut file: (impl AsyncWrite + Unpin),
) {
// We manually implement what is basically a `BufReader` so we have more control
let mut data_buf = vec![0u8; self.req.block_size.size() as usize];
let mut offset: u64 = 0;
// TODO: Prevent loop being a DOS vector
loop {
// TODO: Timeout if nothing is being received
let block = Block::from_stream(stream, &mut data_buf).await.unwrap(); // TODO: Error handling
offset += block.size;
(self.on_progress)(((self.req.size / offset) * 100) as u8); // SAFETY: Percent must be between 0 and 100
debug!(
"Received block at offset {} of size {}",
block.offset, block.size
);
file.write_all(&data_buf[..block.size as usize])
.await
.unwrap(); // TODO: Error handling
// TODO: Should this be `read == 0`
if offset == self.req.size {
break;
}
}
}
}
@ -257,14 +274,16 @@ mod tests {
async move {
let file = BufReader::new(Cursor::new(data));
tx.send(()).unwrap();
send(&mut client, file, &req).await;
Transfer::new(&req, |_| {}).send(&mut client, file).await;
}
});
rx.await.unwrap();
let mut result = Vec::new();
receive(&mut server, &mut result, &req).await;
Transfer::new(&req, |_| {})
.receive(&mut server, &mut result)
.await;
assert_eq!(result, data);
}
@ -291,14 +310,16 @@ mod tests {
async move {
let file = BufReader::new(Cursor::new(data));
tx.send(()).unwrap();
send(&mut client, file, &req).await;
Transfer::new(&req, |_| {}).send(&mut client, file).await;
}
});
rx.await.unwrap();
let mut result = Vec::new();
receive(&mut server, &mut result, &req).await;
Transfer::new(&req, |_| {})
.receive(&mut server, &mut result)
.await;
assert_eq!(result, data);
}
}

View file

@ -57,7 +57,7 @@ export type Procedures = {
{ key: "locations.update", input: LibraryArgs<LocationUpdateArgs>, result: null } |
{ key: "nodes.changeNodeName", input: ChangeNodeNameArgs, result: NodeConfig } |
{ key: "p2p.acceptSpacedrop", input: [string, string | null], result: null } |
{ key: "p2p.spacedrop", input: SpacedropArgs, result: null } |
{ key: "p2p.spacedrop", input: SpacedropArgs, result: string | null } |
{ key: "tags.assign", input: LibraryArgs<TagAssignArgs>, result: null } |
{ key: "tags.create", input: LibraryArgs<TagCreateArgs>, result: Tag } |
{ key: "tags.delete", input: LibraryArgs<number>, result: null } |
@ -69,6 +69,7 @@ export type Procedures = {
{ key: "locations.online", input: never, result: number[][] } |
{ key: "locations.quickRescan", input: LibraryArgs<LightScanArgs>, result: null } |
{ key: "p2p.events", input: never, result: P2PEvent } |
{ key: "p2p.spacedropProgress", input: string, result: number } |
{ key: "sync.newMessage", input: LibraryArgs<null>, result: CRDTOperation }
};