p2p: improve error handling for QuicTransport (#2482)

This commit is contained in:
Julian Braha 2024-05-14 04:11:25 +01:00 committed by GitHub
parent 8bfcf58a0d
commit 518d5836f6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 34 additions and 17 deletions

View file

@ -67,7 +67,7 @@ impl P2PManager {
> {
let (tx, rx) = bounded(25);
let p2p = P2P::new(SPACEDRIVE_APP_ID, node_config.get().await.identity, tx);
let (quic, lp2p_peer_id) = QuicTransport::spawn(p2p.clone())?;
let (quic, lp2p_peer_id) = QuicTransport::spawn(p2p.clone()).map_err(|e| e.to_string())?;
let libraries_hook_id = libraries_hook(p2p.clone(), libraries);
let this = Arc::new(Self {
p2p: p2p.clone(),

View file

@ -16,6 +16,7 @@ use libp2p::{
yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder,
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::{
net::TcpListener,
sync::{mpsc, oneshot},
@ -78,6 +79,20 @@ struct MyBehaviour {
dcutr: dcutr::Behaviour,
}
#[derive(Debug, Error)]
pub enum QuicTransportError {
#[error("Failed to modify the SwarmBuilder: {0}")]
SwarmBuilderCreation(String),
#[error("Internal response channel closed: {0}")]
SendChannelClosed(String),
#[error("Internal response channel closed: {0}")]
ReceiveChannelClosed(#[from] oneshot::error::RecvError),
#[error("Failed internal event: {0}")]
InternalEvent(String),
#[error("Failed to create the Listener: {0}")]
ListenerSetup(std::io::Error),
}
/// Transport using Quic to establish a connection between peers.
/// This uses `libp2p` internally.
#[derive(Debug)]
@ -91,8 +106,7 @@ pub struct QuicTransport {
impl QuicTransport {
/// Spawn the `QuicTransport` and register it with the P2P system.
/// Be aware spawning this does nothing unless you call `Self::set_ipv4_enabled`/`Self::set_ipv6_enabled` to enable the listeners.
// TODO: Error type here
pub fn spawn(p2p: Arc<P2P>) -> Result<(Self, Libp2pPeerId), String> {
pub fn spawn(p2p: Arc<P2P>) -> Result<(Self, Libp2pPeerId), QuicTransportError> {
let keypair = identity_to_libp2p_keypair(p2p.identity());
let libp2p_peer_id = Libp2pPeerId(keypair.public().to_peer_id());
@ -108,14 +122,14 @@ impl QuicTransport {
.with_tokio()
.with_quic()
.with_relay_client(noise::Config::new, yamux::Config::default)
.map_err(|err| err.to_string())?
.map_err(|err| QuicTransportError::SwarmBuilderCreation(err.to_string()))?
.with_behaviour(|keypair, relay_behaviour| MyBehaviour {
stream: libp2p_stream::Behaviour::new(),
relay: relay_behaviour,
autonat: autonat::Behaviour::new(keypair.public().to_peer_id(), Default::default()),
dcutr: dcutr::Behaviour::new(keypair.public().to_peer_id()),
})
.map_err(|err| err.to_string())?
.map_err(|err| QuicTransportError::SwarmBuilderCreation(err.to_string()))?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();
@ -163,7 +177,7 @@ impl QuicTransport {
}
// `None` on the port means disabled. Use `0` for random port.
pub async fn set_ipv4_enabled(&self, port: Option<u16>) -> Result<(), String> {
pub async fn set_ipv4_enabled(&self, port: Option<u16>) -> Result<(), QuicTransportError> {
self.setup_listener(
port.map(|p| SocketAddr::from((Ipv4Addr::UNSPECIFIED, p))),
true,
@ -171,7 +185,7 @@ impl QuicTransport {
.await
}
pub async fn set_ipv6_enabled(&self, port: Option<u16>) -> Result<(), String> {
pub async fn set_ipv6_enabled(&self, port: Option<u16>) -> Result<(), QuicTransportError> {
self.setup_listener(
port.map(|p| SocketAddr::from((Ipv6Addr::UNSPECIFIED, p))),
false,
@ -179,18 +193,20 @@ impl QuicTransport {
.await
}
// TODO: Proper error type
async fn setup_listener(&self, addr: Option<SocketAddr>, ipv4: bool) -> Result<(), String> {
async fn setup_listener(
&self,
addr: Option<SocketAddr>,
ipv4: bool,
) -> Result<(), QuicTransportError> {
let (tx, rx) = oneshot::channel();
let event = if let Some(mut addr) = addr {
if addr.port() == 0 {
#[allow(clippy::unwrap_used)] // TODO: Error handling
addr.set_port(
TcpListener::bind(addr)
.await
.unwrap()
.map_err(|e| QuicTransportError::ListenerSetup(e))?
.local_addr()
.unwrap()
.map_err(|e| QuicTransportError::ListenerSetup(e))?
.port(),
);
}
@ -209,12 +225,13 @@ impl QuicTransport {
}
};
let Ok(_) = self.internal_tx.send(event) else {
return Err("internal channel closed".to_string());
};
self.internal_tx
.send(event)
.map_err(|e| QuicTransportError::SendChannelClosed(e.to_string()))?;
rx.await
.map_err(|_| "internal response channel closed".to_string())
.and_then(|r| r)
.map_err(|e| QuicTransportError::ReceiveChannelClosed(e))
.and_then(|r| r.map_err(|e| QuicTransportError::InternalEvent(e)))
}
pub async fn shutdown(self) {