diff --git a/README.md b/README.md index c6875f0b..347a1223 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,7 @@ use laminar::{Socket, Packet}; // create the socket let mut socket = Socket::bind("127.0.0.1:12345")?; -let packet_sender = socket.get_packet_sender(); +let packet_sender = socket.get_event_sender(); // this will start the socket, which will start a poll mechanism to receive and send messages. let _thread = thread::spawn(move || socket.start_polling()); diff --git a/examples/server_client.rs b/examples/server_client.rs index 3d47793b..a3d9410a 100644 --- a/examples/server_client.rs +++ b/examples/server_client.rs @@ -5,41 +5,48 @@ use std::io::stdin; use std::thread; use std::time::Instant; -use laminar::{ErrorKind, Packet, Socket, SocketEvent}; +use laminar::{ + managers::SimpleConnectionManagerFactory, ConnectionEvent, ErrorKind, Packet, ReceiveEvent, + SendEvent, Socket, +}; const SERVER: &str = "127.0.0.1:12351"; fn server() -> Result<(), ErrorKind> { - let mut socket = Socket::bind(SERVER)?; - let (sender, receiver) = (socket.get_packet_sender(), socket.get_event_receiver()); + // create socket manager, that will use SimpleConnectionManager, that actually initiates connection by exchanging methods + let mut socket = Socket::bind(SERVER, Box::new(SimpleConnectionManagerFactory(false)))?; + let (sender, receiver) = (socket.get_event_sender(), socket.get_event_receiver()); let _thread = thread::spawn(move || socket.start_polling()); loop { - if let Ok(event) = receiver.recv() { + if let Ok(ConnectionEvent(addr, event)) = receiver.recv() { match event { - SocketEvent::Packet(packet) => { + ReceiveEvent::Connected(data) => { + println!( + "{:?} -> Connected msg:{}", + addr, + String::from_utf8_lossy(data.as_ref()) + ); + } + ReceiveEvent::Packet(packet) => { let msg = packet.payload(); - if msg == b"Bye!" { break; } - let msg = String::from_utf8_lossy(msg); - let ip = packet.addr().ip(); - - println!("Received {:?} from {:?}", msg, ip); + println!("{:?} -> Packet msg:{}", addr, msg); sender - .send(Packet::reliable_unordered( + .send(ConnectionEvent( packet.addr(), - "Copy that!".as_bytes().to_vec(), + SendEvent::Packet(Packet::reliable_unordered( + packet.addr(), + [b"Echo: ", msg.as_bytes()].concat(), + )), )) .expect("This should send"); } - SocketEvent::Timeout(address) => { - println!("Client timed out: {}", address); - } - _ => {} + _ => println!("{:?} -> {:?}", addr, event), } } } @@ -49,43 +56,66 @@ fn server() -> Result<(), ErrorKind> { fn client() -> Result<(), ErrorKind> { let addr = "127.0.0.1:12352"; - let mut socket = Socket::bind(addr)?; + let mut socket = Socket::bind(addr, Box::new(SimpleConnectionManagerFactory(false)))?; println!("Connected on {}", addr); - let server = SERVER.parse().unwrap(); + let sender = socket.get_event_sender(); + let _thread = thread::spawn(move || loop { + socket.manual_poll(Instant::now()); - println!("Type a message and press Enter to send. Send `Bye!` to quit."); + if let Some(ConnectionEvent(addr, event)) = socket.recv() { + match event { + ReceiveEvent::Connected(data) => { + println!( + "{:?} -> Connected msg:{}", + addr, + String::from_utf8_lossy(data.as_ref()) + ); + } + ReceiveEvent::Packet(packet) => { + let msg = String::from_utf8_lossy(packet.payload()); + println!("{:?} -> Packet msg:{}", addr, msg); + } + _ => println!("{:?} -> {:?}", addr, event), + } + } + }); let stdin = stdin(); let mut s_buffer = String::new(); + s_buffer.clear(); + + let server = SERVER.parse().unwrap(); + println!("Type a `:c` to connect"); + println!("Type a `` to send a packet"); + println!("Type a `:d` to disconnect"); + println!("Type a `:q` to quit."); loop { - s_buffer.clear(); stdin.read_line(&mut s_buffer)?; let line = s_buffer.replace(|x| x == '\n' || x == '\r', ""); - - socket.send(Packet::reliable_unordered( - server, - line.clone().into_bytes(), - ))?; - - socket.manual_poll(Instant::now()); - - if line == "Bye!" { + if line == ":q" { break; + } else if line.starts_with(":c") { + sender + .send(ConnectionEvent( + server, + SendEvent::Connect(Box::from(line.split_at(2).1.as_bytes())), + )) + .expect("sending should not fail"); + } else if line == ":d" { + sender + .send(ConnectionEvent(server, SendEvent::Disconnect)) + .expect("sending should not fail"); + } else { + sender + .send(ConnectionEvent( + server, + SendEvent::Packet(Packet::reliable_unordered(server, line.into_bytes())), + )) + .expect("sending should not fail"); } - - match socket.recv() { - Some(SocketEvent::Packet(packet)) => { - if packet.addr() == server { - println!("Server sent: {}", String::from_utf8_lossy(packet.payload())); - } else { - println!("Unknown sender."); - } - } - Some(SocketEvent::Timeout(_)) => {} - _ => println!("Silence.."), - } + s_buffer.clear(); } Ok(()) @@ -99,7 +129,7 @@ fn main() -> Result<(), ErrorKind> { let mut s = String::new(); stdin.read_line(&mut s)?; - if s.starts_with("s") { + if s.starts_with('s') { println!("Starting server.."); server() } else { diff --git a/examples/simple_udp.rs b/examples/simple_udp.rs index 678ac9ca..b8074580 100644 --- a/examples/simple_udp.rs +++ b/examples/simple_udp.rs @@ -3,15 +3,18 @@ //! 2. setting up client to send data. //! 3. serialize data to send and deserialize when received. use bincode::{deserialize, serialize}; -use laminar::{Packet, Socket, SocketEvent}; +use laminar::{ + managers::SimpleConnectionManagerFactory, ConnectionEvent, Packet, ReceiveEvent, SendEvent, + Socket, +}; use serde_derive::{Deserialize, Serialize}; use std::net::SocketAddr; use std::time::Instant; /// The socket address of where the server is located. -const SERVER_ADDR: &'static str = "127.0.0.1:12345"; +const SERVER_ADDR: &str = "127.0.0.1:12345"; // The client address from where the data is sent. -const CLIENT_ADDR: &'static str = "127.0.0.1:12346"; +const CLIENT_ADDR: &str = "127.0.0.1:12346"; fn client_address() -> SocketAddr { CLIENT_ADDR.parse().unwrap() @@ -21,40 +24,55 @@ fn server_address() -> SocketAddr { SERVER_ADDR.parse().unwrap() } +// helper function to reduce boiler plate +fn create_packet(addr: SocketAddr, data: &T) -> ConnectionEvent +where + T: serde::Serialize, +{ + ConnectionEvent( + addr, + SendEvent::Packet(Packet::unreliable(addr, serialize(data).unwrap())), + ) +} + /// This will run an simple example with client and server communicating. #[allow(unused_must_use)] pub fn main() { - let mut server = Socket::bind(server_address()).unwrap(); + let mut server = Socket::bind( + server_address(), + Box::new(SimpleConnectionManagerFactory(true)), + ) + .unwrap(); /* setup or `Client` and send some test data. */ - let mut client = Socket::bind(client_address()).unwrap(); - - client.send(Packet::unreliable( + let mut client = Socket::bind( + client_address(), + Box::new(SimpleConnectionManagerFactory(true)), + ) + .unwrap(); + client.send(create_packet( server_address(), - serialize(&DataType::Coords { + &DataType::Coords { latitude: 10.55454, longitude: 10.555, altitude: 1.3, - }) - .unwrap(), + }, )); - client.send(Packet::unreliable( + client.send(create_packet( server_address(), - serialize(&DataType::Coords { + &DataType::Coords { latitude: 3.344, longitude: 5.4545, altitude: 1.33, - }) - .unwrap(), + }, )); - client.send(Packet::unreliable( + client.send(create_packet( server_address(), - serialize(&DataType::Text { + &DataType::Text { string: String::from("Some information"), - }) - .unwrap(), + }, )); // Send the queued send operations @@ -68,11 +86,8 @@ pub fn main() { // Coords { longitude: 5.4545, latitude: 3.344, altitude: 1.33 } // Text { string: "Some information" } while let Some(pkt) = server.recv() { - match pkt { - SocketEvent::Packet(pkt) => { - println!["{:?}", deserialize::(pkt.payload()).unwrap()] - } - _ => {} + if let ConnectionEvent(_addr, ReceiveEvent::Packet(pkt)) = pkt { + println!["{:?}", deserialize::(pkt.payload()).unwrap()] } } } diff --git a/examples/udp.rs b/examples/udp.rs index 72c7db3b..4022e62d 100644 --- a/examples/udp.rs +++ b/examples/udp.rs @@ -2,14 +2,18 @@ //! 1. sending data //! 2. receiving data //! 3. constructing the packet for sending. -use laminar::{Packet, Result, Socket, SocketEvent}; +use laminar::{ + managers::SimpleConnectionManagerFactory, ConnectionEvent, Packet, ReceiveEvent, Result, + SendEvent, Socket, +}; use std::net::SocketAddr; +use std::time::Instant; /// The socket address of where the server is located. -const SERVER_ADDR: &'static str = "127.0.0.1:12345"; +const SERVER_ADDR: &str = "127.0.0.1:12345"; // The client address from where the data is sent. -const CLIENT_ADDR: &'static str = "127.0.0.1:12346"; +const CLIENT_ADDR: &str = "127.0.0.1:12346"; fn client_address() -> SocketAddr { CLIENT_ADDR.parse().unwrap() @@ -20,57 +24,70 @@ fn server_address() -> SocketAddr { } /// This is an example of how to send data to an specific address. -pub fn send_data() -> Result<()> { - // Setup a udp socket and bind it to the client address. - let mut socket = Socket::bind(client_address()).unwrap(); - - let packet = construct_packet(); +pub fn send_data(socket: &mut Socket) -> Result<()> { + let (to_address, packet) = construct_packet(); // next send or packet to the endpoint we earlier putted into the packet. - socket.send(packet) + socket.send(ConnectionEvent(to_address, SendEvent::Packet(packet)))?; + + // this function processes all events and actually sends or receives packets + socket.manual_poll(Instant::now()); + Ok(()) } /// This is an example of how to receive data over udp. -pub fn receive_data() { - // setup an udp socket and bind it to the client address. - let mut socket = Socket::bind(server_address()).unwrap(); - +pub fn receive_data(socket: &mut Socket) { + // this function processes all events and actually sends or receives packets + socket.manual_poll(Instant::now()); // Next start receiving. loop { - if let Some(result) = socket.recv() { - match result { - SocketEvent::Packet(packet) => { - let endpoint: SocketAddr = packet.addr(); - let received_data: &[u8] = packet.payload(); - - // you can here deserialize your bytes into the data you have passed it when sending. - - println!( - "Received packet from: {:?} with length {}", - endpoint, - received_data.len() - ); - } - _ => {} - } + if let Some(ConnectionEvent(_addr, ReceiveEvent::Packet(packet))) = socket.recv() { + let endpoint: SocketAddr = packet.addr(); + let received_data: &[u8] = packet.payload(); + + // you can here deserialize your bytes into the data you have passed it when sending. + + println!( + "Received packet from: {:?} with length {}", + endpoint, + received_data.len() + ); break; } } } /// This is an example of how to construct a packet. -pub fn construct_packet() -> Packet { +pub fn construct_packet() -> (SocketAddr, Packet) { // this is the destination address of the packet. let destination: SocketAddr = server_address(); // lets construct some payload (raw data) for or packet. - let raw_data = "example data".as_bytes(); + let raw_data = b"example data"; // lets construct or packet by passing in the destination for this packet and the bytes needed to be send.. - let packet: Packet = Packet::reliable_unordered(destination, raw_data.to_owned()); + let packet: Packet = Packet::reliable_unordered(destination, raw_data.to_vec()); - packet + (destination, packet) } -// TODO: Use functions in example -fn main() {} +fn main() -> Result<()> { + // Setup a udp socket and bind it to the client address. + // SimpleSocketManager(true) provides connection that immediatelly goes to connected state, after any socket event is received + let mut client = Socket::bind( + client_address(), + Box::new(SimpleConnectionManagerFactory(true)), + ) + .unwrap(); + + // setup an udp socket and bind it to the server address. + let mut server = Socket::bind( + server_address(), + Box::new(SimpleConnectionManagerFactory(true)), + ) + .unwrap(); + + send_data(&mut client)?; + receive_data(&mut server); + Ok(()) +} diff --git a/src/either.rs b/src/either.rs index 5b00ec96..21e7880e 100644 --- a/src/either.rs +++ b/src/either.rs @@ -1,4 +1,5 @@ -pub(crate) enum Either { +#[derive(Debug)] +pub enum Either { Left(L), Right(R), } diff --git a/src/error.rs b/src/error.rs index c6203219..1ac8fca3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,7 @@ //! This module contains the laminar error handling logic. -use crate::SocketEvent; +use crate::net::events::{ConnectionEvent, ReceiveEvent, SendEvent}; +use crate::net::managers::ConnectionManagerError; use crossbeam_channel::SendError; use std::{ error::Error, @@ -26,10 +27,14 @@ pub enum ErrorKind { ReceivedDataToShort, /// Protocol versions did not match ProtocolVersionMismatch, - /// Could not send on `SendChannel`. - SendError(SendError), + /// Could not send a SendEvent from the user. + ChannelSendingError(SendError>), + /// Could not send an ReceiveEvent to the user. + ChannelReceivingError(SendError>), /// Expected header but could not be read from buffer. CouldNotReadHeader(String), + /// Errors that is returned from `ConnectionManager` either preprocessing data or processing packet + ConnectionError(ConnectionManagerError), } impl Display for ErrorKind { @@ -57,9 +62,14 @@ impl Display for ErrorKind { ErrorKind::ProtocolVersionMismatch => { write!(fmt, "The protocol versions do not match.") } - ErrorKind::SendError(e) => write!( + ErrorKind::ChannelSendingError(e) => write!( fmt, - "Could not sent on channel because it was closed. Reason: {:?}", + "Could not sent a SendEvent on channel because it was closed. Reason: {:?}", + e + ), + ErrorKind::ChannelReceivingError(e) => write!( + fmt, + "Could not sent a ReceiveEvent on channel because it was closed. Reason: {:?}", e ), ErrorKind::CouldNotReadHeader(header) => write!( @@ -67,6 +77,11 @@ impl Display for ErrorKind { "Expected {} header but could not be read from buffer.", header ), + ErrorKind::ConnectionError(err) => write!( + fmt, + "Something went wrong in ConnectionManager. Reason: {:?}.", + err + ), } } } @@ -103,6 +118,8 @@ impl Display for DecodingErrorKind { pub enum PacketErrorKind { /// The maximal allowed size of the packet was exceeded ExceededMaxPacketSize, + /// Only user packets (a.k.a PacketType::Packet) can be fragmented + PacketTypeCannotBeFragmented, } impl Display for PacketErrorKind { @@ -111,6 +128,10 @@ impl Display for PacketErrorKind { PacketErrorKind::ExceededMaxPacketSize => { write!(fmt, "The packet size was bigger than the max allowed size.") } + PacketErrorKind::PacketTypeCannotBeFragmented => write!( + fmt, + "Only user packets (PacketType::Packet) can be fragmented." + ), } } } @@ -173,9 +194,21 @@ impl From for ErrorKind { } } -impl From> for ErrorKind { - fn from(inner: SendError) -> Self { - ErrorKind::SendError(inner) +impl From>> for ErrorKind { + fn from(inner: SendError>) -> Self { + ErrorKind::ChannelSendingError(inner) + } +} + +impl From>> for ErrorKind { + fn from(inner: SendError>) -> Self { + ErrorKind::ChannelReceivingError(inner) + } +} + +impl From for ErrorKind { + fn from(inner: ConnectionManagerError) -> Self { + ErrorKind::ConnectionError(inner) } } diff --git a/src/infrastructure/acknowledgment.rs b/src/infrastructure/acknowledgment.rs index b268d58b..a0ce1a6d 100644 --- a/src/infrastructure/acknowledgment.rs +++ b/src/infrastructure/acknowledgment.rs @@ -1,5 +1,4 @@ -use crate::packet::OrderingGuarantee; -use crate::packet::SequenceNumber; +use crate::packet::{OrderingGuarantee, PacketType, SequenceNumber}; use crate::sequence_buffer::{sequence_greater_than, sequence_less_than, SequenceBuffer}; use std::collections::HashMap; @@ -101,6 +100,7 @@ impl AcknowledgmentHandler { /// Enqueue the outgoing packet for acknowledgment. pub fn process_outgoing( &mut self, + packet_type: PacketType, payload: &[u8], ordering_guarantee: OrderingGuarantee, item_identifier: Option, @@ -108,6 +108,7 @@ impl AcknowledgmentHandler { self.sent_packets.insert( self.sequence_number, SentPacket { + packet_type, payload: Box::from(payload), ordering_guarantee, item_identifier, @@ -138,8 +139,9 @@ impl AcknowledgmentHandler { } } -#[derive(Clone, Debug, Default, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq)] pub struct SentPacket { + pub packet_type: PacketType, pub payload: Box<[u8]>, pub ordering_guarantee: OrderingGuarantee, pub item_identifier: Option, diff --git a/src/infrastructure/fragmenter.rs b/src/infrastructure/fragmenter.rs index 47ba8ce2..e5c62ab4 100644 --- a/src/infrastructure/fragmenter.rs +++ b/src/infrastructure/fragmenter.rs @@ -71,7 +71,7 @@ impl Fragmentation { Fragmentation::fragments_needed(payload_length, config.fragment_size) as u8; if num_fragments > config.max_fragments { - Err(FragmentErrorKind::ExceededMaxFragments)?; + return Err(FragmentErrorKind::ExceededMaxFragments.into()); } for fragment_id in 0..num_fragments { @@ -112,16 +112,16 @@ impl Fragmentation { // get entry of previous received fragments let reassembly_data = match self.fragments.get_mut(fragment_header.sequence()) { Some(val) => val, - None => Err(FragmentErrorKind::CouldNotFindFragmentById)?, + None => return Err(FragmentErrorKind::CouldNotFindFragmentById.into()), }; // Got the data if reassembly_data.num_fragments_total != fragment_header.fragment_count() { - Err(FragmentErrorKind::FragmentWithUnevenNumberOfFragemts)? + return Err(FragmentErrorKind::FragmentWithUnevenNumberOfFragemts.into()); } if reassembly_data.fragments_received[usize::from(fragment_header.id())] { - Err(FragmentErrorKind::AlreadyProcessedFragment)? + return Err(FragmentErrorKind::AlreadyProcessedFragment.into()); } // increase number of received fragments and set the specific fragment to received. diff --git a/src/lib.rs b/src/lib.rs index 1f830b77..9be3ec99 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ mod config; mod either; mod error; mod infrastructure; +pub mod managers; mod net; mod packet; mod protocol_version; @@ -39,5 +40,9 @@ pub use self::throughput::ThroughputMonitoring; pub use self::config::Config; pub use self::error::{ErrorKind, Result}; -pub use self::net::{LinkConditioner, Socket, SocketEvent}; +pub use self::net::events::{ + ConnectionEvent, DestroyReason, DisconnectReason, ReceiveEvent, SendEvent, TargetHost, +}; +pub use self::net::managers::{ConnectionManager, ConnectionManagerError}; +pub use self::net::{LinkConditioner, Socket}; pub use self::packet::{DeliveryGuarantee, OrderingGuarantee, Packet}; diff --git a/src/managers.rs b/src/managers.rs new file mode 100644 index 00000000..70e49e6d --- /dev/null +++ b/src/managers.rs @@ -0,0 +1,4 @@ +//! This module provides socket managers. +mod simple; + +pub use self::simple::SimpleConnectionManagerFactory; diff --git a/src/managers/simple.rs b/src/managers/simple.rs new file mode 100644 index 00000000..dba40e0c --- /dev/null +++ b/src/managers/simple.rs @@ -0,0 +1,188 @@ +use crate::net::managers::*; + +use crate::packet::{DeliveryGuarantee, OrderingGuarantee}; +use std::collections::VecDeque; +use std::net::SocketAddr; +use std::time::Instant; + +/// The simplest connection manager, that immediately goes into the connected state, after creating it +#[derive(Debug)] +struct AlwaysConnectedConnectionManager { + // this is used to set initial state as connected when creating an instance. + // we'll take this value on first `update` call + initial_state: Option, +} + +impl Default for AlwaysConnectedConnectionManager { + fn default() -> Self { + Self { + // initialize to connected state on creation. + initial_state: Some(ConnectionState::Connected(Box::default())), + } + } +} + +impl ConnectionManager for AlwaysConnectedConnectionManager { + fn update<'a>( + &mut self, + _buffer: &'a mut [u8], + _time: Instant, + ) -> Option> { + self.initial_state + .take() // on first call state will be moved out. + .map(ConnectionManagerEvent::NewState) + } + + fn preprocess_incoming<'a, 'b>( + &mut self, + data: &'a [u8], + _buffer: &'b mut [u8], + ) -> Result<&'b [u8], ConnectionManagerError> + where + 'a: 'b, + { + Ok(data) + } + + fn postprocess_outgoing<'a, 'b>(&mut self, data: &'a [u8], _buffer: &'b mut [u8]) -> &'b [u8] + where + 'a: 'b, + { + data + } + + fn process_protocol_data(&mut self, _data: &[u8]) -> Result<(), ConnectionManagerError> { + Ok(()) + } + + fn connect(&mut self, _data: Box<[u8]>) {} + + fn disconnect(&mut self) {} +} + +/// Simple connection manager, that actually tries to connect by exchanging 'connect', 'connected', and 'disconnect' messages with the remote host, +#[derive(Debug, Default)] +struct SimpleConnectionManager { + state: ConnectionState, + changes: VecDeque, ConnectionState>>, +} + +impl SimpleConnectionManager { + fn change_state(&mut self, new: ConnectionState) { + if self.state.try_change(&new).is_some() { + self.changes.push_back(Either::Right(self.state.clone())); + } + } + + fn send_packet(&mut self, payload: &[u8]) { + self.changes.push_back(Either::Left(Box::from(payload))); + } + + fn get_packet<'a>(data: Box<[u8]>, buffer: &'a mut [u8]) -> GenericPacket<'a> { + // get result slice + let payload = &mut buffer[0..data.as_ref().len()]; + // copy from buffer what we want to send + payload.copy_from_slice(data.as_ref()); + // create packet + GenericPacket::connection_packet( + payload, + DeliveryGuarantee::Reliable, + OrderingGuarantee::None, + ) + } +} + +impl ConnectionManager for SimpleConnectionManager { + fn update<'a>( + &mut self, + buffer: &'a mut [u8], + _time: Instant, + ) -> Option> { + self.changes + .pop_front() + .take() + .map(move |event| match event { + Either::Left(data) => ConnectionManagerEvent::NewPacket( + SimpleConnectionManager::get_packet(data, buffer), + ), + Either::Right(state) => ConnectionManagerEvent::NewState(state), + }) + } + + fn preprocess_incoming<'a, 'b>( + &mut self, + data: &'a [u8], + _buffer: &'b mut [u8], + ) -> Result<&'b [u8], ConnectionManagerError> + where + 'a: 'b, + { + Ok(data) + } + + fn postprocess_outgoing<'a, 'b>(&mut self, data: &'a [u8], _buffer: &'b mut [u8]) -> &'b [u8] + where + 'a: 'b, + { + data + } + + fn process_protocol_data(&mut self, data: &[u8]) -> Result<(), ConnectionManagerError> { + match self.state { + ConnectionState::Connecting => { + if data.starts_with(b"connect-") { + self.send_packet(b"connected-"); + self.change_state(ConnectionState::Connected(Box::from(data.split_at(8).1))); + } else if data.starts_with(b"connected-") { + self.change_state(ConnectionState::Connected(Box::from(data.split_at(10).1))); + } + } + ConnectionState::Connected(_) => { + if data.eq(b"disconnect-") { + self.change_state(ConnectionState::Disconnected(TargetHost::RemoteHost)); + } + } + _ => panic!("In disconnected nothing can happen"), + } + Ok(()) + } + + fn connect(&mut self, data: Box<[u8]>) { + self.send_packet([b"connect-", data.as_ref()].concat().as_ref()); + } + + fn disconnect(&mut self) { + if let ConnectionState::Connected(_) = self.state { + self.send_packet(b"disconnect-"); + } + self.change_state(ConnectionState::Disconnected(TargetHost::LocalHost)); + } +} + +/// Simplest implementation of socket manager, always accept a connection and never destroy, no matter how many errors connection reports +/// It can create two types of connection managers: +/// * true - creates `AlwaysConnectedConnectionManager` +/// * false - creates `SimpleConnectionManager` +#[derive(Debug)] +pub struct SimpleConnectionManagerFactory(pub bool); + +impl ConnectionManagerFactory for SimpleConnectionManagerFactory { + fn create_remote_connection_manager( + &mut self, + addr: &SocketAddr, + _raw_bytes: &[u8], + ) -> Box { + self.create_local_connection_manager(addr) + } + + fn create_local_connection_manager( + &mut self, + _addr: &SocketAddr, + ) -> Box { + if self.0 { + Box::new(AlwaysConnectedConnectionManager::default()) + } else { + Box::new(SimpleConnectionManager::default()) + } + } +} diff --git a/src/net.rs b/src/net.rs index ede86d33..0ffe22d6 100644 --- a/src/net.rs +++ b/src/net.rs @@ -2,16 +2,20 @@ //! You can think of the socket, connection management, congestion control. mod connection; -mod events; mod link_conditioner; +mod metrics_collector; mod quality; +mod reliability_system; mod socket; mod virtual_connection; pub mod constants; +pub mod events; +pub mod managers; -pub use self::events::SocketEvent; pub use self::link_conditioner::LinkConditioner; +pub use self::metrics_collector::MetricsCollector; pub use self::quality::{NetworkQuality, RttMeasurer}; -pub use self::socket::Socket; +pub use self::reliability_system::{IncomingPackets, OutgoingPackets, ReliabilitySystem}; +pub use self::socket::{Socket, SocketWithConditioner}; pub use self::virtual_connection::VirtualConnection; diff --git a/src/net/connection.rs b/src/net/connection.rs index 2ee26360..86bac45e 100644 --- a/src/net/connection.rs +++ b/src/net/connection.rs @@ -1,96 +1,161 @@ -pub use crate::net::{NetworkQuality, RttMeasurer, VirtualConnection}; +pub use crate::net::{ + managers::ConnectionManager, NetworkQuality, ReliabilitySystem, RttMeasurer, VirtualConnection, +}; +use crate::{ + net::events::{ConnectionEvent, DestroyReason, DisconnectReason, ReceiveEvent}, + net::managers::ConnectionState, + net::socket::SocketWithConditioner, + net::MetricsCollector, + ErrorKind, +}; + +use crossbeam_channel::{self, Sender}; -use crate::config::Config; -use crate::either::Either::{self, Left, Right}; use std::{ collections::HashMap, net::SocketAddr, time::{Duration, Instant}, }; -/// Maintains a registry of active "connections". Essentially, when we receive a packet on the -/// socket from a particular `SocketAddr`, we will track information about it here. +/// Maintains a registry of active "connections". #[derive(Debug)] pub struct ActiveConnections { connections: HashMap, } impl ActiveConnections { + /// Initialized active connection list. pub fn new() -> Self { Self { connections: HashMap::new(), } } - /// Try to get a `VirtualConnection` by address. If the connection does not exist, it will be - /// inserted and returned. - pub fn get_or_insert_connection( + /// Inserts new connection, and calls `update` method on `ConnectionManager` to initialized it. + pub fn insert_and_init_connection( &mut self, - address: SocketAddr, - config: &Config, + connection: VirtualConnection, + socket: &mut SocketWithConditioner, + event_sender: &Sender>, + metrics: &mut MetricsCollector, time: Instant, + tmp_buffer: &mut [u8], ) -> &mut VirtualConnection { - self.connections - .entry(address) - .or_insert_with(|| VirtualConnection::new(address, config, time)) - } + let conn = self + .connections + .entry(connection.remote_address()) + .or_insert(connection); - /// Try to get or create a [VirtualConnection] by address. If the connection does not exist, it will be - /// created and returned, but not inserted into the table of active connections. - pub(crate) fn get_or_create_connection( - &mut self, - address: SocketAddr, - config: &Config, - time: Instant, - ) -> Either<&mut VirtualConnection, VirtualConnection> { - if let Some(connection) = self.connections.get_mut(&address) { - Left(connection) - } else { - Right(VirtualConnection::new(address, config, time)) + conn.update_connection_manager(event_sender, metrics, socket, time, tmp_buffer); + metrics.track_connection_created(&conn.remote_address()); + if let Err(err) = event_sender.send(ConnectionEvent( + conn.remote_address(), + ReceiveEvent::Created, + )) { + metrics.track_connection_error( + &conn.remote_address(), + &ErrorKind::from(err), + "sending connection create event", + ); } + conn } - /// Removes the connection from `ActiveConnections` by socket address. - pub fn remove_connection( - &mut self, - address: &SocketAddr, - ) -> Option<(SocketAddr, VirtualConnection)> { - self.connections.remove_entry(address) + /// Returns `VirtualConnection` or None if it doesn't exists for a given address. + pub fn try_get(&mut self, address: &SocketAddr) -> Option<&mut VirtualConnection> { + self.connections.get_mut(address) } - /// Check for and return `VirtualConnection`s which have been idling longer than `max_idle_time`. - pub fn idle_connections(&mut self, max_idle_time: Duration, time: Instant) -> Vec { - self.connections - .iter() - .filter(|(_, connection)| connection.last_heard(time) >= max_idle_time) - .map(|(address, _)| *address) - .collect() + /// Iterates through all active connections, and `update`s each connection manager. + pub fn update_connections( + &mut self, + sender: &Sender>, + metrics: &mut MetricsCollector, + socket: &mut SocketWithConditioner, + time: Instant, + buffer: &mut [u8], + ) { + self.connections.iter_mut().for_each(|(_, conn)| { + conn.update_connection_manager(sender, metrics, socket, time, buffer) + }); } - /// Get a list of addresses of dead connections - pub fn dead_connections(&mut self) -> Vec { - self.connections - .iter() - .filter(|(_, connection)| connection.should_be_dropped()) - .map(|(address, _)| *address) - .collect() + /// Iterate through all of the connections and check if any of them should be dropped. + /// Remove dropped connections from the active connections. For each connection removed, we will send an event to the `event_sender` channel. + pub fn handle_dead_clients( + &mut self, + time: Instant, + idle_connection_timeout: Duration, + sender: &Sender>, + metrics: &mut MetricsCollector, + ) { + let drop_list: Vec<_> = self + .connections + .iter_mut() + .filter_map(|(_, connection)| { + connection + .should_be_dropped(idle_connection_timeout, time) + .map(|reason| (connection.remote_address(), reason)) + }) + .collect(); + + for (address, reason) in drop_list { + self.remove_connection(&address, sender, metrics, reason, "removing dead clients"); + } } - /// Check for and return `VirtualConnection`s which have not sent anything for a duration of at least `heartbeat_interval`. - pub fn heartbeat_required_connections( + pub fn handle_heartbeat( &mut self, - heartbeat_interval: Duration, time: Instant, - ) -> impl Iterator { - self.connections - .iter_mut() - .filter(move |(_, connection)| connection.last_sent(time) >= heartbeat_interval) - .map(|(_, connection)| connection) + heartbeat_interval: Duration, + socket: &mut SocketWithConditioner, + metrics: &mut MetricsCollector, + ) { + // Iterate over all connections which have not sent a packet for a duration of at least + // `heartbeat_interval` (from config), and send a heartbeat packet to each. + let connections = self.connections.iter_mut(); + connections.for_each(|(_, connection)| { + connection.handle_heartbeat(time, heartbeat_interval, socket, metrics); + }); } - /// Returns true if the given connection exists. - pub fn exists(&self, address: &SocketAddr) -> bool { - self.connections.contains_key(&address) + /// Removes the connection from `ActiveConnections` by socket address, and sends appropriate events. + fn remove_connection( + &mut self, + address: &SocketAddr, + sender: &Sender>, + metrics: &mut MetricsCollector, + reason: DestroyReason, + error_context: &str, + ) -> bool { + if let Some((_, conn)) = self.connections.remove_entry(address) { + if let ConnectionState::Connected(_) = conn.get_current_state() { + if let Err(err) = sender.send(ConnectionEvent( + conn.remote_address(), + ReceiveEvent::Disconnected(DisconnectReason::Destroying(reason.clone())), + )) { + metrics.track_connection_error( + &conn.remote_address(), + &ErrorKind::from(err), + error_context, + ); + } + } + if let Err(err) = sender.send(ConnectionEvent( + conn.remote_address(), + ReceiveEvent::Destroyed(reason), + )) { + metrics.track_connection_error( + &conn.remote_address(), + &ErrorKind::from(err), + error_context, + ); + } + metrics.track_connection_destroyed(address); + true + } else { + false + } } /// Returns the number of connected clients. @@ -102,12 +167,18 @@ impl ActiveConnections { #[cfg(test)] mod tests { + use super::{ActiveConnections, Config}; use std::{ sync::Arc, time::{Duration, Instant}, }; + #[derive(Debug)] + struct DummyConnManager {} + + impl ConnectionManager for DummyConnManager {} + const ADDRESS: &str = "127.0.0.1:12345"; #[test] diff --git a/src/net/events.rs b/src/net/events.rs index 3bd9de54..966eae5d 100644 --- a/src/net/events.rs +++ b/src/net/events.rs @@ -1,15 +1,70 @@ +use crate::net::managers::ConnectionManagerError; use crate::packet::Packet; use std::net::SocketAddr; -/// Events that can occur in `laminar` and that will be pushed through the `event_receiver` returned by `Socket::bind`. -#[derive(Debug, PartialEq)] -pub enum SocketEvent { - /// A packet was received from a client. +/// Events that can occur in `laminar` for a active connection. +#[derive(Debug)] +pub enum ReceiveEvent { + /// When the connection is actually created and added to the active connections list. + /// Next possible event for connection is: `Connected` or `Destroyed`. + Created, + /// When `ConnectionManager` successfully establishes connection. + /// Next possible event is: `Packet` or `Disconnected`. + Connected(Box<[u8]>), + /// When connection is in Connected state, it can actually start receiving packets. + /// Next possible event is: `Packet` or `Disconnected`. + Packet(Packet), + /// When connection, that was previously in a connected state, is disconnected + /// it can either be disconnected by `ConnectionManager` in this case it is a 'clean' disconnect, where the initiator of disconnect is also specified + /// or it can be closed by `SocketManager` if it decides to do so + Disconnected(DisconnectReason), + /// When it is removed from the active connections list. + /// Connection can be destroyed when the disconnect is initiated by `ConnectionManager`, or `SocketManager` decided to destroy it. + Destroyed(DestroyReason), +} + +/// Events that are received from the user. +#[derive(Debug)] +pub enum SendEvent { + /// Initiate connect request, this will call `ConnectionManager.connect` method. + Connect(Box<[u8]>), + /// Send packet to the remote host. Packet(Packet), - /// A new client connected. - /// Clients are uniquely identified by the ip:port combination at this layer. - Connect(SocketAddr), - /// The client has been idling for a configurable amount of time. - /// You can control the timeout in the config. - Timeout(SocketAddr), + /// Initiate disconnect, this will call `ConnectionManager.disconnect` method. + Disconnect, +} + +/// Provides a reason why the connection was destroyed. +#[derive(Debug, PartialEq, Clone)] +pub enum DestroyReason { + /// When `SocketManager` decided to destroy a connection for error that arrived from `ConnectionManager`. + ConnectionError(ConnectionManagerError), + /// After `Config.idle_connection_timeout` connection had no activity. + Timeout, + /// If there are too many non-acked packets in flight `Config.max_packets_in_flight`. + TooManyPacketsInFlight, + /// When `ConnectionManager` changed to `Disconnected` state. + GracefullyDisconnected, } + +/// Provides convenient enum, to specify either Local or Remote host +#[derive(Debug, PartialEq, Clone)] +pub enum TargetHost { + /// Represents the localhost + LocalHost, + /// Represents the remote host + RemoteHost, +} + +/// Disconnect reason, received by connection +#[derive(Debug, PartialEq)] +pub enum DisconnectReason { + /// Disconnect was initiated by the local or remote host + ClosedBy(TargetHost), + /// Socket manager decided to destroy connection for provided reason + Destroying(DestroyReason), +} + +/// Relate send or receive events together with address. +#[derive(Debug)] +pub struct ConnectionEvent(pub SocketAddr, pub Event); diff --git a/src/net/managers.rs b/src/net/managers.rs new file mode 100644 index 00000000..3c322550 --- /dev/null +++ b/src/net/managers.rs @@ -0,0 +1,140 @@ +pub use crate::either::Either; +pub use crate::net::events::{DestroyReason, TargetHost}; +pub use crate::packet::{ + DeliveryGuarantee, GenericPacket, OrderingGuarantee, OutgoingPacket, PacketType, +}; +pub use crate::ErrorKind; +use std::fmt::Debug; +use std::net::SocketAddr; +use std::time::Instant; + +/// At any given moment, any connection can be only in these states. +/// These states are only managed through `ConnectionManager`, and define behaviour for sending and receiving packets. +/// Only these state transition is allowed: +/// | Old | New | +/// | ---------- | ---------- | +/// | Connecting | Connected | +/// | Connecting | Disconnected | +/// | Connected | Disconnected | +/// If these rules are not satisfied, panic! will be called. +/// Each state specifies what can and cannot be done: +/// * Connecting - This is initial state when socket is created, at this moment no packets can be sent or received from user, +/// in this state only `ConnectionManager` is able to receive and sent packets to properly initiate connection. +/// * Connected - Only in this state all packets will be sent or received between peers. +/// * Disconnected - in this state `ConnectionManager` is not able to send or receive any packets. Connection will be destroyed immediatelly. +#[derive(Debug, PartialEq, Clone)] +pub enum ConnectionState { + Connecting, + Connected(Box<[u8]>), + Disconnected(TargetHost), +} + +impl ConnectionState { + /// Tries to change current state and returns old state if successfully changed. + pub fn try_change(&mut self, new: &Self) -> Option { + match (&self, &new) { + (ConnectionState::Connecting, ConnectionState::Connected(_)) + | (ConnectionState::Connecting, ConnectionState::Disconnected(_)) + | (ConnectionState::Connected(_), ConnectionState::Disconnected(_)) => { + Some(std::mem::replace(self, new.clone())) + } + _ => None, + } + } +} + +impl Default for ConnectionState { + fn default() -> Self { + ConnectionState::Connecting + } +} + +/// Generic error type, that is used by ConnectionManager implementation. +#[derive(Debug, PartialEq, Clone)] +pub enum ConnectionManagerError { + /// Something really bad has happened, this is not a recoverable error, and the connection should be destroyed. + Fatal(String), + /// Something unexpected has happened, but the connection is still in a valid state. + /// `SocketManager` can decide when to destroy the connection if two many warnings are propagated from the same connection in a short amount of time. + Warning(String), // TODO: is it enought? or maybe we need more fields? +} + +/// Events that are generated by `ConnectionManager`s update method. +#[derive(Debug)] +pub enum ConnectionManagerEvent<'a> { + /// Generated new packet, that will be sent immediately. + NewPacket(GenericPacket<'a>), + /// `ConnectionState` state changed. + NewState(ConnectionState), + /// Error occured within `ConnectionManager` + Error(ConnectionManagerError), +} + +/// It abstracts pure UDP packets, and allows to implement Connected/Disconnected states. +/// This table summary shows where exactly ConnectionManager sits in between different layers. +/// | Abstraction layer | Capabilities | +/// |-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| +/// | Application | Can receive these events: Created->Connected(data)->Packet(data)->Disconnected(reason)->Destroyed(reason). Can send these events: Connect(data)->Packet->Disconnect. | +/// | ConnectionManager | Receives all, except user packets, and can report state updates, and generate new packets via `update` method | +/// | Laminar | Adds/Removes headers to packets, so that it could provides reliability, ordering, fragmentation, etc.. capabilities. | +/// | ConnectionManager | May change raw incoming and outgoing bytes to apply encryption, compression, etc. | +/// +/// It tries to maintain a valid connection state, and it can't decide when to destroy itself, only when it changes to disconnected, it will be destroyed later. +/// From the point of view of connection manager, laminar's header + payload is interpreted as user data. +/// Distinction between user packet and the protocol-specific packet is encoded in laminar's packet header. +/// Preprocess/Postprocess and Update methods always accept temporary buffer of size `Config.receive_buffer_max_size` that can be used as output. +pub trait ConnectionManager: Debug + Send { + /// When the instance of the connection manager is created, the `update` method will be called, before any other method. + /// This function should be called frequently, even if there are no packets to send or receive. + /// It will always be called last, after all, other methods are called, in the main laminar`s loop. + /// It can generate all kinds of packets: heartbeat, user or connection protocol packets. + /// (maybe heartbeat functionality should be moved here?) + /// It will be called in the loop as long, as it returns any results. E.g. `connect` method may generate multiple results: change state and send packet. + fn update<'a>( + &mut self, + buffer: &'a mut [u8], + time: Instant, + ) -> Option>; + + /// This will be called for all incoming data, including packets that were resent by remote host. + /// If the packet is accepted by laminar's reliability layer `process_protocol_data` will be called immediately. + /// It should return a slice where header + payload exists + fn preprocess_incoming<'a, 'b>( + &mut self, + data: &'a [u8], + buffer: &'b mut [u8], + ) -> Result<&'b [u8], ConnectionManagerError> + where + 'a: 'b; + + /// This will be called for all outgoing data, including packets that are resent. + /// Dropped packets will also go through here. + /// Accepts full packet: header + payload + fn postprocess_outgoing<'a, 'b>(&mut self, data: &'a [u8], buffer: &'b mut [u8]) -> &'b [u8] + where + 'a: 'b; + + /// This will be called only for incoming protocol-specific packets after laminar's reliability layer accepted it. + /// This is a convenient place to process actual logic because it is filtered by laminar's reliability layer and it accepts only `PacketType::Connection` messages. + fn process_protocol_data(&mut self, data: &[u8]) -> Result<(), ConnectionManagerError>; + + /// This will be invoked when a user sends connect request, + /// Some protocols might provide a way to pass initial connection data, hence the `data` field. + /// This method can only be called when the connection is in `Connecting` state + fn connect(&mut self, data: Box<[u8]>); + + /// This will be invoked when a user sends SendEvent::Disconnect request. + fn disconnect(&mut self); +} + +/// Factory that knows how to create a ConnectionManager for a connection. +pub trait ConnectionManagerFactory: Debug + Send { + /// This is invoked when a message from unknown address arrives. + fn create_remote_connection_manager( + &mut self, + addr: &SocketAddr, + raw_bytes: &[u8], + ) -> Box; + /// This is invoked when any user event is received for unknown address. + fn create_local_connection_manager(&mut self, addr: &SocketAddr) -> Box; +} diff --git a/src/net/metrics_collector.rs b/src/net/metrics_collector.rs new file mode 100644 index 00000000..b1b519c3 --- /dev/null +++ b/src/net/metrics_collector.rs @@ -0,0 +1,35 @@ +use crate::error::ErrorKind; + +use log::error; +use std::io::ErrorKind::WouldBlock; +use std::net::SocketAddr; + +/// Tracks all sorts of global statistics +// TODO write implementation of this +#[derive(Debug)] +pub struct MetricsCollector {} + +impl MetricsCollector { + pub fn track_connection_error( + &mut self, + addr: &SocketAddr, + error: &ErrorKind, + error_context: &str, + ) { + match error { + ErrorKind::IOError(ref e) if e.kind() == WouldBlock => {} + _ => error!("Error, {} ({:?}): {:?}", error_context, addr, error), + } + } + pub fn track_global_error(&mut self, error: &ErrorKind, error_context: &str) { + error!("Error, {}: {:?}", error_context, error); + } + + pub fn track_sent_bytes(&mut self, _addr: &SocketAddr, _bytes: usize) {} + pub fn track_received_bytes(&mut self, _addr: &SocketAddr, _bytes: usize) {} + pub fn track_ignored_bytes(&mut self, _addr: &SocketAddr, _bytes: usize) {} + + pub fn track_connection_created(&mut self, _addr: &SocketAddr) {} + + pub fn track_connection_destroyed(&mut self, _addr: &SocketAddr) {} +} diff --git a/src/net/reliability_system.rs b/src/net/reliability_system.rs new file mode 100644 index 00000000..116a7109 --- /dev/null +++ b/src/net/reliability_system.rs @@ -0,0 +1,999 @@ +use crate::{ + config::Config, + either::Either, + error::{ErrorKind, PacketErrorKind, Result}, + infrastructure::{ + arranging::{Arranging, ArrangingSystem, OrderingSystem, SequencingSystem}, + AcknowledgmentHandler, CongestionHandler, Fragmentation, SentPacket, + }, + net::constants::{ + ACKED_PACKET_HEADER, DEFAULT_ORDERING_STREAM, DEFAULT_SEQUENCING_STREAM, + STANDARD_HEADER_SIZE, + }, + packet::{ + DeliveryGuarantee, GenericPacket, OrderingGuarantee, OutgoingPacket, OutgoingPacketBuilder, + Packet, PacketReader, PacketType, SequenceNumber, + }, +}; + +use std::collections::VecDeque; + +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +/// Helper class that implement Iterator, and is used to return incomming (from bytes to packets) or outgoing (from packet to bytes) packets. +/// It is used as optimization in cases, where most of the time there is only one element to iterate, and we don't want to create vector for it +pub struct ZeroOrMore { + pub data: Either, VecDeque>, +} + +impl ZeroOrMore { + pub fn zero() -> Self { + Self { + data: Either::Left(None), + } + } + + pub fn one(data: T) -> Self { + Self { + data: Either::Left(Some(data)), + } + } + + pub fn many(vec: VecDeque) -> Self { + Self { + data: Either::Right(vec), + } + } +} + +impl Iterator for ZeroOrMore { + type Item = T; + + fn next(&mut self) -> Option { + match &mut self.data { + Either::Left(option) => option.take(), + Either::Right(vec) => vec.pop_front(), + } + } +} + +/// Stores packets with headers that will be sent to network +pub struct OutgoingPackets<'a> { + data: ZeroOrMore>, +} + +impl<'a> OutgoingPackets<'a> { + pub fn one(packet: OutgoingPacket<'a>) -> Self { + Self { + data: ZeroOrMore::one(packet), + } + } + pub fn many(packets: VecDeque>) -> Self { + Self { + data: ZeroOrMore::many(packets), + } + } +} + +impl<'a> IntoIterator for OutgoingPackets<'a> { + type Item = OutgoingPacket<'a>; + type IntoIter = ZeroOrMore; + + fn into_iter(self) -> Self::IntoIter { + self.data + } +} + +/// Stores parsed packets with their types, that was received from network +pub struct IncomingPackets { + data: ZeroOrMore<(Packet, PacketType)>, +} + +impl IncomingPackets { + pub fn zero() -> Self { + Self { + data: ZeroOrMore::zero(), + } + } + + pub fn one(packet: Packet, packet_type: PacketType) -> Self { + Self { + data: ZeroOrMore::one((packet, packet_type)), + } + } + + pub fn many(vec: VecDeque<(Packet, PacketType)>) -> Self { + Self { + data: ZeroOrMore::many(vec), + } + } +} + +impl IntoIterator for IncomingPackets { + type Item = (Packet, PacketType); + type IntoIter = ZeroOrMore; + + fn into_iter(self) -> Self::IntoIter { + self.data + } +} + +/// Keeps reliability information about connections. +/// It exposes various, connection quality related functions, and provides functions that assembles and disassembles packet with reliability information in it. +pub struct ReliabilitySystem { + last_heard: Instant, + last_sent: Instant, + ordering_system: OrderingSystem<(Box<[u8]>, PacketType)>, + sequencing_system: SequencingSystem>, + acknowledge_handler: AcknowledgmentHandler, + congestion_handler: CongestionHandler, + + config: Config, + fragmentation: Fragmentation, +} + +impl ReliabilitySystem { + /// Creates and returns a new `VirtualConnection` that wraps the provided socket address + pub fn new(config: &Config, time: Instant) -> ReliabilitySystem { + ReliabilitySystem { + last_heard: time, + last_sent: time, + ordering_system: OrderingSystem::new(), + sequencing_system: SequencingSystem::new(), + acknowledge_handler: AcknowledgmentHandler::new(), + congestion_handler: CongestionHandler::new(config), + fragmentation: Fragmentation::new(config), + config: config.to_owned(), + } + } + + /// Determine if this connection should be dropped due to its state + pub fn should_be_dropped(&self) -> bool { + self.acknowledge_handler.packets_in_flight() > self.config.max_packets_in_flight + } + + /// Returns a [Duration] representing the interval since we last heard from the client + pub fn last_heard(&self, time: Instant) -> Duration { + // TODO: Replace with saturating_duration_since once it becomes stable. + // This function panics if the user supplies a time instant earlier than last_heard. + time.duration_since(self.last_heard) + } + + /// Returns a [Duration] representing the interval since we last sent to the client + pub fn last_sent(&self, time: Instant) -> Duration { + // TODO: Replace with saturating_duration_since once it becomes stable. + // This function panics if the user supplies a time instant earlier than last_heard. + time.duration_since(self.last_sent) + } + + /// Constructs outgoing packet(s) from dropped packet. + pub fn process_dropped<'a>( + &mut self, + packet: &'a SentPacket, + time: Instant, + ) -> Result> { + self.process_outgoing_impl( + GenericPacket { + packet_type: packet.packet_type, + payload: &packet.payload, + // Because a delivery guarantee is only sent with reliable packets + delivery: DeliveryGuarantee::Reliable, + // This is stored with the dropped packet because they could be mixed + ordering: packet.ordering_guarantee, + }, + packet.item_identifier, + time, + ) + } + + /// Constructs outgoing packet(s) with provided reliability information, for sending over the network. + pub fn process_outgoing<'a>( + &mut self, + packet: GenericPacket<'a>, + time: Instant, + ) -> Result> { + self.last_sent = time; + self.process_outgoing_impl(packet, None, time) + } + + fn process_outgoing_impl<'a>( + &mut self, + packet: GenericPacket<'a>, + last_item_identifier: Option, + time: Instant, + ) -> Result> { + match packet.delivery { + DeliveryGuarantee::Unreliable => { + if packet.payload.len() <= self.config.receive_buffer_max_size { + if packet.packet_type == PacketType::Heartbeat { + // TODO (bug?) is this required here? + self.congestion_handler + .process_outgoing(self.acknowledge_handler.local_sequence_num(), time); + } + + let mut builder = OutgoingPacketBuilder::new(packet.payload) + .with_default_header(packet.packet_type, packet.delivery, packet.ordering); + + if let OrderingGuarantee::Sequenced(stream_id) = packet.ordering { + let item_identifier = self + .sequencing_system + .get_or_create_stream(stream_id.unwrap_or(DEFAULT_SEQUENCING_STREAM)) + .new_item_identifier(); + + builder = builder.with_sequencing_header(item_identifier as u16, stream_id); + }; + + Ok(OutgoingPackets::one(builder.build())) + } else { + Err(ErrorKind::PacketError( + PacketErrorKind::ExceededMaxPacketSize, + )) + } + } + DeliveryGuarantee::Reliable => { + let payload_length = packet.payload.len() as u16; + + let mut item_identifier_value = None; + let outgoing = { + // spit the packet if the payload length is greater than the allowed fragment size. + if payload_length <= self.config.fragment_size { + let mut builder = OutgoingPacketBuilder::new(packet.payload) + .with_default_header( + packet.packet_type, + packet.delivery, + packet.ordering, + ); + + builder = builder.with_acknowledgment_header( + self.acknowledge_handler.local_sequence_num(), + self.acknowledge_handler.remote_sequence_num(), + self.acknowledge_handler.ack_bitfield(), + ); + + if let OrderingGuarantee::Ordered(stream_id) = packet.ordering { + let item_identifier = + if let Some(item_identifier) = last_item_identifier { + item_identifier + } else { + self.ordering_system + .get_or_create_stream( + stream_id.unwrap_or(DEFAULT_ORDERING_STREAM), + ) + .new_item_identifier() + }; + + item_identifier_value = Some(item_identifier); + + builder = builder.with_ordering_header(item_identifier, stream_id); + }; + + if let OrderingGuarantee::Sequenced(stream_id) = packet.ordering { + let item_identifier = + if let Some(item_identifier) = last_item_identifier { + item_identifier + } else { + self.sequencing_system + .get_or_create_stream( + stream_id.unwrap_or(DEFAULT_SEQUENCING_STREAM), + ) + .new_item_identifier() + }; + + item_identifier_value = Some(item_identifier); + + builder = builder.with_sequencing_header(item_identifier, stream_id); + }; + + OutgoingPackets::one(builder.build()) + } else { + if packet.packet_type != PacketType::Packet { + return Err(ErrorKind::PacketError( + PacketErrorKind::PacketTypeCannotBeFragmented, + )); + } + OutgoingPackets::many( + Fragmentation::spit_into_fragments(packet.payload, &self.config)? + .into_iter() + .enumerate() + .map(|(fragment_id, fragment)| { + let fragments_needed = Fragmentation::fragments_needed( + payload_length, + self.config.fragment_size, + ) + as u8; + + let mut builder = OutgoingPacketBuilder::new(fragment) + .with_default_header( + PacketType::Fragment, + packet.delivery, + packet.ordering, + ); + + builder = builder.with_fragment_header( + self.acknowledge_handler.local_sequence_num(), + fragment_id as u8, + fragments_needed, + ); + if fragment_id == 0 { + // TODO (bug?) why there is no ordering/sequencing for fragmented packet? + builder = builder.with_acknowledgment_header( + self.acknowledge_handler.local_sequence_num(), + self.acknowledge_handler.remote_sequence_num(), + self.acknowledge_handler.ack_bitfield(), + ); + } + + builder.build() + }) + .collect(), + ) + } + }; + + self.congestion_handler + .process_outgoing(self.acknowledge_handler.local_sequence_num(), time); + self.acknowledge_handler.process_outgoing( + packet.packet_type, + packet.payload, + packet.ordering, + item_identifier_value, + ); + + Ok(outgoing) + } + } + } + + /// Constructs incoming packet(s) from bytes. + pub fn process_incoming( + &mut self, + addr: SocketAddr, + received_data: &[u8], + time: Instant, + ) -> Result { + self.last_heard = time; + let mut packet_reader = PacketReader::new(received_data); + + let header = packet_reader.read_standard_header()?; + + if !header.is_current_protocol() { + return Err(ErrorKind::ProtocolVersionMismatch); + } + + if header.packet_type() == PacketType::Heartbeat { + // Heartbeat packets are unreliable, unordered and empty packets. + // We already updated our `self.last_heard` time, nothing else to be done. + return Ok(IncomingPackets::zero()); + } + + match header.delivery_guarantee() { + DeliveryGuarantee::Unreliable => { + if let OrderingGuarantee::Sequenced(_id) = header.ordering_guarantee() { + let arranging_header = + packet_reader.read_arranging_header(u16::from(STANDARD_HEADER_SIZE))?; + + let payload = packet_reader.read_payload(); + + let stream = self + .sequencing_system + .get_or_create_stream(arranging_header.stream_id()); + + if let Some(packet) = stream.arrange(arranging_header.arranging_id(), payload) { + return Ok(IncomingPackets::one( + Packet::new( + addr, + packet, + header.delivery_guarantee(), + OrderingGuarantee::Sequenced(Some(arranging_header.stream_id())), + ), + header.packet_type(), + )); + } + + return Ok(IncomingPackets::zero()); + } + return Ok(IncomingPackets::one( + Packet::new( + addr, + packet_reader.read_payload(), + header.delivery_guarantee(), + header.ordering_guarantee(), + ), + header.packet_type(), + )); + } + DeliveryGuarantee::Reliable => { + if header.packet_type() == PacketType::Fragment { + if let Ok((fragment_header, acked_header)) = packet_reader.read_fragment() { + let payload = packet_reader.read_payload(); + + if let Some(acked_header) = acked_header { + // TODO (bug?) should we also check for sequencing/ordering? + self.congestion_handler + .process_incoming(acked_header.sequence()); + self.acknowledge_handler.process_incoming( + acked_header.sequence(), + acked_header.ack_seq(), + acked_header.ack_field(), + ); + } + + match self + .fragmentation + .handle_fragment(fragment_header, &payload) + { + Ok(Some(payload)) => { + return Ok(IncomingPackets::one( + Packet::new( + addr, + payload.into_boxed_slice(), + header.delivery_guarantee(), + OrderingGuarantee::None, + ), + PacketType::Packet, // change to `Packet`, because we do inverse action in `process_outgoing`. + )); + } + Ok(None) => return Ok(IncomingPackets::zero()), + Err(e) => return Err(e), + }; + } + } else { + let acked_header = packet_reader.read_acknowledge_header()?; + + self.congestion_handler + .process_incoming(acked_header.sequence()); + self.acknowledge_handler.process_incoming( + acked_header.sequence(), + acked_header.ack_seq(), + acked_header.ack_field(), + ); + + if let OrderingGuarantee::Sequenced(_) = header.ordering_guarantee() { + let arranging_header = packet_reader.read_arranging_header(u16::from( + STANDARD_HEADER_SIZE + ACKED_PACKET_HEADER, + ))?; + + let payload = packet_reader.read_payload(); + + let stream = self + .sequencing_system + .get_or_create_stream(arranging_header.stream_id()); + + if let Some(packet) = + stream.arrange(arranging_header.arranging_id(), payload) + { + return Ok(IncomingPackets::one( + Packet::new( + addr, + packet, + header.delivery_guarantee(), + OrderingGuarantee::Sequenced(Some( + arranging_header.stream_id(), + )), + ), + header.packet_type(), + )); + } + } else if let OrderingGuarantee::Ordered(_id) = header.ordering_guarantee() { + let arranging_header = packet_reader.read_arranging_header(u16::from( + STANDARD_HEADER_SIZE + ACKED_PACKET_HEADER, + ))?; + + let payload = packet_reader.read_payload(); + + let stream = self + .ordering_system + .get_or_create_stream(arranging_header.stream_id()); + let arranged_packet = stream.arrange( + arranging_header.arranging_id(), + (payload, header.packet_type()), + ); + return Ok(IncomingPackets::many( + arranged_packet + .into_iter() + .chain(stream.iter_mut()) + .map(|(packet, packet_type)| { + ( + Packet::new( + addr, + packet, + header.delivery_guarantee(), + OrderingGuarantee::Ordered(Some( + arranging_header.stream_id(), + )), + ), + packet_type, + ) + }) + .collect(), + )); + } else { + let payload = packet_reader.read_payload(); + return Ok(IncomingPackets::one( + Packet::new( + addr, + payload, + header.delivery_guarantee(), + header.ordering_guarantee(), + ), + header.packet_type(), + )); + } + } + } + } + + Ok(IncomingPackets::zero()) + } + + /// This will gather dropped packets from the acknowledgment handler. + /// + /// Note that after requesting dropped packets the dropped packets will be removed from this client. + pub fn gather_dropped_packets(&mut self) -> Vec { + self.acknowledge_handler.dropped_packets() + } +} + +#[cfg(test)] +mod tests { + use super::VirtualConnection; + use crate::config::Config; + use crate::net::constants; + use crate::packet::header::{AckedPacketHeader, ArrangingHeader, HeaderWriter, StandardHeader}; + use crate::packet::{DeliveryGuarantee, OrderingGuarantee, Outgoing, Packet, PacketType}; + use crate::protocol_version::ProtocolVersion; + use crate::SocketEvent; + use byteorder::{BigEndian, WriteBytesExt}; + use crossbeam_channel::{unbounded, TryRecvError}; + use std::io::Write; + use std::time::Instant; + + const PAYLOAD: [u8; 4] = [1, 2, 3, 4]; + + #[test] + fn assure_right_fragmentation() { + let mut protocol_version = Vec::new(); + protocol_version + .write_u16::(ProtocolVersion::get_crc16()) + .unwrap(); + + let standard_header = [protocol_version, vec![1, 1, 2]].concat(); + + let acked_header = vec![1, 0, 0, 2, 0, 0, 0, 3]; + let first_fragment = vec![0, 1, 1, 3]; + let second_fragment = vec![0, 1, 2, 3]; + let third_fragment = vec![0, 1, 3, 3]; + + let (tx, rx) = unbounded::(); + + let mut connection = create_virtual_connection(); + connection + .process_incoming( + [standard_header.as_slice(), acked_header.as_slice()] + .concat() + .as_slice(), + &tx, + Instant::now(), + ) + .unwrap(); + assert!(rx.try_recv().is_err()); + connection + .process_incoming( + [ + standard_header.as_slice(), + first_fragment.as_slice(), + &PAYLOAD, + ] + .concat() + .as_slice(), + &tx, + Instant::now(), + ) + .unwrap(); + assert!(rx.try_recv().is_err()); + connection + .process_incoming( + [ + standard_header.as_slice(), + second_fragment.as_slice(), + &PAYLOAD, + ] + .concat() + .as_slice(), + &tx, + Instant::now(), + ) + .unwrap(); + assert!(rx.try_recv().is_err()); + connection + .process_incoming( + [ + standard_header.as_slice(), + third_fragment.as_slice(), + &PAYLOAD, + ] + .concat() + .as_slice(), + &tx, + Instant::now(), + ) + .unwrap(); + + let complete_fragment = rx.try_recv().unwrap(); + + match complete_fragment { + SocketEvent::Packet(fragment) => assert_eq!( + fragment.payload(), + &*[PAYLOAD, PAYLOAD, PAYLOAD].concat().into_boxed_slice() + ), + _ => { + panic!("Expected fragment other result."); + } + } + } + + #[test] + fn expect_fragmentation() { + let mut connection = create_virtual_connection(); + + let buffer = vec![1; 4000]; + + let outgoing = connection + .process_outgoing( + &buffer, + DeliveryGuarantee::Reliable, + OrderingGuarantee::Ordered(None), + None, + Instant::now(), + ) + .unwrap(); + + match outgoing { + Outgoing::Packet(_) => panic!("Expected fragment got packet"), + Outgoing::Fragments(fragments) => { + assert_eq!(fragments.len(), 4); + } + } + } + + #[test] + fn assure_correct_outgoing_processing() { + let mut connection = create_virtual_connection(); + + let buffer = vec![1; 1000]; + + connection + .process_outgoing( + &buffer, + DeliveryGuarantee::Unreliable, + OrderingGuarantee::None, + None, + Instant::now(), + ) + .unwrap(); + + connection + .process_outgoing( + &buffer, + DeliveryGuarantee::Unreliable, + OrderingGuarantee::Sequenced(None), + None, + Instant::now(), + ) + .unwrap(); + + connection + .process_outgoing( + &buffer, + DeliveryGuarantee::Reliable, + OrderingGuarantee::Ordered(None), + None, + Instant::now(), + ) + .unwrap(); + + connection + .process_outgoing( + &buffer, + DeliveryGuarantee::Reliable, + OrderingGuarantee::Sequenced(None), + None, + Instant::now(), + ) + .unwrap(); + } + + #[test] + fn assure_right_sequencing() { + let mut connection = create_virtual_connection(); + + assert_incoming_with_order( + DeliveryGuarantee::Unreliable, + OrderingGuarantee::Sequenced(Some(1)), + &mut connection, + Ok(SocketEvent::Packet(Packet::unreliable_sequenced( + get_fake_addr(), + PAYLOAD.to_vec(), + Some(1), + ))), + 1, + ); + + assert_incoming_with_order( + DeliveryGuarantee::Unreliable, + OrderingGuarantee::Sequenced(Some(1)), + &mut connection, + Ok(SocketEvent::Packet(Packet::unreliable_sequenced( + get_fake_addr(), + PAYLOAD.to_vec(), + Some(1), + ))), + 3, + ); + + assert_incoming_with_order( + DeliveryGuarantee::Unreliable, + OrderingGuarantee::Sequenced(Some(1)), + &mut connection, + Err(TryRecvError::Empty), + 2, + ); + + assert_incoming_with_order( + DeliveryGuarantee::Unreliable, + OrderingGuarantee::Sequenced(Some(1)), + &mut connection, + Ok(SocketEvent::Packet(Packet::unreliable_sequenced( + get_fake_addr(), + PAYLOAD.to_vec(), + Some(1), + ))), + 4, + ); + + assert_incoming_with_order( + DeliveryGuarantee::Reliable, + OrderingGuarantee::Sequenced(Some(1)), + &mut connection, + Ok(SocketEvent::Packet(Packet::reliable_sequenced( + get_fake_addr(), + PAYLOAD.to_vec(), + Some(1), + ))), + 5, + ); + } + + #[test] + fn assure_right_ordering() { + let mut connection = create_virtual_connection(); + + assert_incoming_with_order( + DeliveryGuarantee::Reliable, + OrderingGuarantee::Ordered(Some(1)), + &mut connection, + Ok(SocketEvent::Packet(Packet::reliable_ordered( + get_fake_addr(), + PAYLOAD.to_vec(), + Some(1), + ))), + 0, + ); + + assert_incoming_with_order( + DeliveryGuarantee::Reliable, + OrderingGuarantee::Ordered(Some(1)), + &mut connection, + Err(TryRecvError::Empty), + 2, + ); + + assert_incoming_with_order( + DeliveryGuarantee::Reliable, + OrderingGuarantee::Ordered(Some(1)), + &mut connection, + Err(TryRecvError::Empty), + 3, + ); + + assert_incoming_with_order( + DeliveryGuarantee::Reliable, + OrderingGuarantee::Ordered(Some(1)), + &mut connection, + Ok(SocketEvent::Packet(Packet::reliable_ordered( + get_fake_addr(), + PAYLOAD.to_vec(), + Some(1), + ))), + 1, + ); + } + + #[test] + fn assure_correct_processing_of_incoming() { + let mut connection = create_virtual_connection(); + + assert_incoming_without_order( + DeliveryGuarantee::Unreliable, + &mut connection, + SocketEvent::Packet(Packet::unreliable(get_fake_addr(), PAYLOAD.to_vec())), + ); + + assert_incoming_without_order( + DeliveryGuarantee::Reliable, + &mut connection, + SocketEvent::Packet(Packet::reliable_unordered( + get_fake_addr(), + PAYLOAD.to_vec(), + )), + ); + + assert_incoming_with_order( + DeliveryGuarantee::Unreliable, + OrderingGuarantee::Sequenced(Some(1)), + &mut connection, + Ok(SocketEvent::Packet(Packet::unreliable_sequenced( + get_fake_addr(), + PAYLOAD.to_vec(), + Some(1), + ))), + 1, + ); + + assert_incoming_with_order( + DeliveryGuarantee::Reliable, + OrderingGuarantee::Ordered(Some(1)), + &mut connection, + Ok(SocketEvent::Packet(Packet::reliable_ordered( + get_fake_addr(), + PAYLOAD.to_vec(), + Some(1), + ))), + 0, + ); + } + + #[test] + fn assure_right_header_size() { + assert_right_header_size( + DeliveryGuarantee::Unreliable, + OrderingGuarantee::None, + (constants::STANDARD_HEADER_SIZE) as usize, + ); + assert_right_header_size( + DeliveryGuarantee::Unreliable, + OrderingGuarantee::Sequenced(None), + (constants::STANDARD_HEADER_SIZE + constants::ARRANGING_PACKET_HEADER) as usize, + ); + assert_right_header_size( + DeliveryGuarantee::Reliable, + OrderingGuarantee::None, + (constants::STANDARD_HEADER_SIZE + constants::ACKED_PACKET_HEADER) as usize, + ); + assert_right_header_size( + DeliveryGuarantee::Reliable, + OrderingGuarantee::Ordered(None), + (constants::STANDARD_HEADER_SIZE + + constants::ACKED_PACKET_HEADER + + constants::ARRANGING_PACKET_HEADER) as usize, + ); + } + + /// ======= helper functions ========= + fn create_virtual_connection() -> VirtualConnection { + VirtualConnection::new(get_fake_addr(), &Config::default(), Instant::now()) + } + + fn get_fake_addr() -> std::net::SocketAddr { + "127.0.0.1:0".parse().unwrap() + } + + // assert that the processing of the given `DeliveryGuarantee` and `OrderingGuarantee` results into the given `result_event` + fn assert_incoming_with_order( + delivery: DeliveryGuarantee, + ordering: OrderingGuarantee, + connection: &mut VirtualConnection, + result_event: Result, + order_id: u16, + ) { + let mut packet = Vec::new(); + + // configure the right header based on specified guarantees. + let header = StandardHeader::new(delivery, ordering, PacketType::Packet); + header.parse(&mut packet).unwrap(); + + if let OrderingGuarantee::Sequenced(val) = ordering { + if delivery == DeliveryGuarantee::Reliable { + let ack_header = AckedPacketHeader::new(1, 2, 3); + ack_header.parse(&mut packet).unwrap(); + } + + let order_header = ArrangingHeader::new(order_id, val.unwrap()); + order_header.parse(&mut packet).unwrap(); + } + + if let OrderingGuarantee::Ordered(val) = ordering { + if delivery == DeliveryGuarantee::Reliable { + let ack_header = AckedPacketHeader::new(1, 2, 3); + let order_header = ArrangingHeader::new(order_id, val.unwrap()); + ack_header.parse(&mut packet).unwrap(); + order_header.parse(&mut packet).unwrap(); + } + } + + if let OrderingGuarantee::None = ordering { + if delivery == DeliveryGuarantee::Reliable { + let ack_header = AckedPacketHeader::new(1, 2, 3); + ack_header.parse(&mut packet).unwrap(); + } + } + + packet.write_all(&PAYLOAD).unwrap(); + + let (tx, rx) = unbounded::(); + + connection + .process_incoming(packet.as_slice(), &tx, Instant::now()) + .unwrap(); + + let event = rx.try_recv(); + + match event { + Ok(val) => assert_eq!(val, result_event.unwrap()), + Err(e) => assert_eq!(e, result_event.err().unwrap()), + } + } + + // assert that the given `DeliveryGuarantee` results into the given `SocketEvent` after processing. + fn assert_incoming_without_order( + delivery: DeliveryGuarantee, + connection: &mut VirtualConnection, + result_event: SocketEvent, + ) { + let mut packet = Vec::new(); + + // configure the right header based on specified guarantees. + let header = StandardHeader::new(delivery, OrderingGuarantee::None, PacketType::Packet); + header.parse(&mut packet).unwrap(); + + if delivery == DeliveryGuarantee::Reliable { + let ack_header = AckedPacketHeader::new(1, 2, 3); + ack_header.parse(&mut packet).unwrap(); + } + + packet.write_all(&PAYLOAD).unwrap(); + + let (tx, rx) = unbounded::(); + + connection + .process_incoming(packet.as_slice(), &tx, Instant::now()) + .unwrap(); + + let event = rx.try_recv(); + + assert_eq!(event, Ok(result_event)); + } + + // assert that the size of the processed header is the same as the given one. + fn assert_right_header_size( + delivery: DeliveryGuarantee, + ordering: OrderingGuarantee, + expected_header_size: usize, + ) { + let mut connection = create_virtual_connection(); + + let buffer = vec![1; 500]; + + let outgoing = connection + .process_outgoing(&buffer, delivery, ordering, None, Instant::now()) + .unwrap(); + + match outgoing { + Outgoing::Packet(packet) => { + assert_eq!(packet.contents().len() - buffer.len(), expected_header_size); + } + Outgoing::Fragments(_) => panic!("Expected packet got fragment"), + } + } +} diff --git a/src/net/socket.rs b/src/net/socket.rs index f1a7ce96..2e312429 100644 --- a/src/net/socket.rs +++ b/src/net/socket.rs @@ -1,12 +1,15 @@ -use crate::either::Either::{Left, Right}; use crate::{ config::Config, error::{ErrorKind, Result}, - net::{connection::ActiveConnections, events::SocketEvent, link_conditioner::LinkConditioner}, - packet::{DeliveryGuarantee, Outgoing, Packet}, + net::events::{ConnectionEvent, ReceiveEvent, SendEvent}, + net::managers::{ConnectionManager, ConnectionManagerFactory, ConnectionState}, + net::MetricsCollector, + net::{ + connection::ActiveConnections, link_conditioner::LinkConditioner, ReliabilitySystem, + VirtualConnection, + }, }; -use crossbeam_channel::{self, unbounded, Receiver, SendError, Sender, TryRecvError}; -use log::error; +use crossbeam_channel::{self, unbounded, Receiver, Sender, TryRecvError}; use std::{ self, io, net::{Ipv4Addr, SocketAddr, SocketAddrV4, ToSocketAddrs, UdpSocket}, @@ -14,19 +17,64 @@ use std::{ time::{Duration, Instant}, }; +// just wrap whese too together, for easier passing around +#[derive(Debug)] +pub struct SocketWithConditioner { + postprocess_buffer: Vec, // this buffer is used for postprocess_outgoing + socket: UdpSocket, + link_conditioner: Option, +} + +impl SocketWithConditioner { + /// Send a single packet over the UDP socket. + /// Postprocess it using Connectionmanager`s postprocess_outgoing method, + /// and use link condition if exists, to simulate network conditions + pub fn send_packet_and_log( + &mut self, + addr: &SocketAddr, + conn_man: &mut dyn ConnectionManager, + payload: &[u8], + metrics: &mut MetricsCollector, + error_context: &str, + ) { + match self.send_packet(addr, conn_man, payload) { + Ok(bytes) => metrics.track_sent_bytes(addr, bytes), + Err(ref err) => metrics.track_connection_error(addr, err, error_context), + } + } + + fn send_packet( + &mut self, + addr: &SocketAddr, + manager: &mut dyn ConnectionManager, + payload: &[u8], + ) -> Result { + let payload = manager.postprocess_outgoing(payload, self.postprocess_buffer.as_mut_slice()); + if let Some(ref mut link) = self.link_conditioner { + if !link.should_send() { + return Ok(0); + } + } + Ok(self.socket.send_to(payload, addr)?) + } +} + /// A reliable UDP socket implementation with configurable reliability and ordering guarantees. #[derive(Debug)] pub struct Socket { - socket: UdpSocket, + socket: SocketWithConditioner, config: Config, connections: ActiveConnections, recv_buffer: Vec, - link_conditioner: Option, - event_sender: Sender, - packet_receiver: Receiver, + preprocess_buffer: Vec, // this is temporary buffer, used by connection manager, in cases where it needs to modify incomming raw bytes, or create new packets for sending - receiver: Receiver, - sender: Sender, + event_sender: Sender>, + packet_receiver: Receiver>, + + receiver: Receiver>, + sender: Sender>, + metrics: MetricsCollector, + conn_manager_factory: Box, } enum UdpSocketState { @@ -38,21 +86,27 @@ impl Socket { /// Binds to the socket and then sets up `ActiveConnections` to manage the "connections". /// Because UDP connections are not persistent, we can only infer the status of the remote /// endpoint by looking to see if they are still sending packets or not - pub fn bind(addresses: A) -> Result { - Socket::bind_with_config(addresses, Config::default()) + pub fn bind( + addresses: A, + factory: Box, + ) -> Result { + Socket::bind_with_config(addresses, Config::default(), factory) } /// Bind to any local port on the system, if available - pub fn bind_any() -> Result { - Self::bind_any_with_config(Config::default()) + pub fn bind_any(factory: Box) -> Result { + Self::bind_any_with_config(Config::default(), factory) } /// Bind to any local port on the system, if available, with a given config - pub fn bind_any_with_config(config: Config) -> Result { + pub fn bind_any_with_config( + config: Config, + factory: Box, + ) -> Result { let loopback = Ipv4Addr::new(127, 0, 0, 1); let address = SocketAddrV4::new(loopback, 0); let socket = UdpSocket::bind(address)?; - Self::bind_internal(socket, config) + Self::bind_internal(socket, config, factory) } /// Binds to the socket and then sets up `ActiveConnections` to manage the "connections". @@ -60,55 +114,67 @@ impl Socket { /// endpoint by looking to see if they are still sending packets or not /// /// This function allows you to configure laminar with the passed configuration. - pub fn bind_with_config(addresses: A, config: Config) -> Result { + pub fn bind_with_config( + addresses: A, + config: Config, + factory: Box, + ) -> Result { let socket = UdpSocket::bind(addresses)?; - Self::bind_internal(socket, config) + Self::bind_internal(socket, config, factory) } - fn bind_internal(socket: UdpSocket, config: Config) -> Result { + fn bind_internal( + socket: UdpSocket, + config: Config, + factory: Box, + ) -> Result { socket.set_nonblocking(!config.blocking_mode)?; let (event_sender, event_receiver) = unbounded(); let (packet_sender, packet_receiver) = unbounded(); Ok(Socket { recv_buffer: vec![0; config.receive_buffer_max_size], - socket, + preprocess_buffer: vec![0; config.receive_buffer_max_size], + socket: SocketWithConditioner { + postprocess_buffer: Vec::with_capacity(config.receive_buffer_max_size), + socket, + link_conditioner: None, + }, config, connections: ActiveConnections::new(), - link_conditioner: None, event_sender, packet_receiver, sender: packet_sender, receiver: event_receiver, + metrics: MetricsCollector {}, + conn_manager_factory: factory, }) } /// Returns a handle to the packet sender which provides a thread-safe way to enqueue packets /// to be processed. This should be used when the socket is busy running its polling loop in a /// separate thread. - pub fn get_packet_sender(&mut self) -> Sender { + pub fn get_event_sender(&mut self) -> Sender> { self.sender.clone() } /// Returns a handle to the event receiver which provides a thread-safe way to retrieve events /// from the socket. This should be used when the socket is busy running its polling loop in /// a separate thread. - pub fn get_event_receiver(&mut self) -> Receiver { + pub fn get_event_receiver(&mut self) -> Receiver> { self.receiver.clone() } /// Send a packet - pub fn send(&mut self, packet: Packet) -> Result<()> { - match self.sender.send(packet) { + pub fn send(&mut self, event: ConnectionEvent) -> Result<()> { + match self.sender.send(event) { Ok(_) => Ok(()), - Err(error) => Err(ErrorKind::SendError(SendError(SocketEvent::Packet( - error.0, - )))), + Err(error) => Err(ErrorKind::from(error)), } } /// Receive a packet - pub fn recv(&mut self) -> Option { + pub fn recv(&mut self) -> Option> { match self.receiver.try_recv() { Ok(pkt) => Some(pkt), Err(TryRecvError::Empty) => None, @@ -142,185 +208,133 @@ impl Socket { match self.recv_from(time) { Ok(UdpSocketState::MaybeMore) => continue, Ok(UdpSocketState::MaybeEmpty) => break, - Err(e) => error!("Encountered an error receiving data: {:?}", e), + Err(ref e) => self + .metrics + .track_global_error(e, "receiving data from socket"), } } // Now grab all the packets waiting to be sent and send them while let Ok(p) = self.packet_receiver.try_recv() { - if let Err(e) = self.send_to(p, time) { - match e { - ErrorKind::IOError(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} - _ => error!("There was an error sending packet: {:?}", e), - } - } + self.send_to(p, time); } - // Check for idle clients - if let Err(e) = self.handle_idle_clients(time) { - error!("Encountered an error when sending TimeoutEvent: {:?}", e); - } + self.connections.update_connections( + &self.event_sender, + &mut self.metrics, + &mut self.socket, + time, + &mut self.preprocess_buffer, + ); - // Handle any dead clients - self.handle_dead_clients().expect("Internal laminar error"); + // Check for all dead connections + self.connections.handle_dead_clients( + time, + self.config.idle_connection_timeout, + &self.event_sender, + &mut self.metrics, + ); // Finally send heartbeat packets to connections that require them, if enabled if let Some(heartbeat_interval) = self.config.heartbeat_interval { - if let Err(e) = self.send_heartbeat_packets(heartbeat_interval, time) { - match e { - ErrorKind::IOError(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} - _ => error!("There was an error sending a heartbeat packet: {:?}", e), - } - } + self.connections.handle_heartbeat( + time, + heartbeat_interval, + &mut self.socket, + &mut self.metrics, + ) } } /// Set the link conditioner for this socket. See [LinkConditioner] for further details. pub fn set_link_conditioner(&mut self, link_conditioner: Option) { - self.link_conditioner = link_conditioner; + self.socket.link_conditioner = link_conditioner; } /// Get the local socket address pub fn local_addr(&self) -> Result { - Ok(self.socket.local_addr()?) - } - - /// Iterate through the dead connections and disconnect them by removing them from the - /// connection map while informing the user of this by sending an event. - fn handle_dead_clients(&mut self) -> Result<()> { - let dead_addresses = self.connections.dead_connections(); - for address in dead_addresses { - self.connections.remove_connection(&address); - self.event_sender.send(SocketEvent::Timeout(address))?; - } - - Ok(()) - } - - /// Iterate through all of the idle connections based on `idle_connection_timeout` config and - /// remove them from the active connections. For each connection removed, we will send a - /// `SocketEvent::TimeOut` event to the `event_sender` channel. - fn handle_idle_clients(&mut self, time: Instant) -> Result<()> { - let idle_addresses = self - .connections - .idle_connections(self.config.idle_connection_timeout, time); - for address in idle_addresses { - self.connections.remove_connection(&address); - self.event_sender.send(SocketEvent::Timeout(address))?; - } - - Ok(()) - } - - /// Iterate over all connections which have not sent a packet for a duration of at least - /// `heartbeat_interval` (from config), and send a heartbeat packet to each. - fn send_heartbeat_packets( - &mut self, - heartbeat_interval: Duration, - time: Instant, - ) -> Result { - let heartbeat_packets_and_addrs = self - .connections - .heartbeat_required_connections(heartbeat_interval, time) - .map(|connection| { - ( - connection.create_and_process_heartbeat(time), - connection.remote_address, - ) - }) - .collect::>(); - - let mut bytes_sent = 0; - - for (heartbeat_packet, address) in heartbeat_packets_and_addrs { - if self.should_send_packet() { - bytes_sent += self.send_packet(&address, &heartbeat_packet.contents())?; - } - } - - Ok(bytes_sent) + Ok(self.socket.socket.local_addr()?) } // Serializes and sends a `Packet` on the socket. On success, returns the number of bytes written. - fn send_to(&mut self, packet: Packet, time: Instant) -> Result { - let connection = - self.connections - .get_or_insert_connection(packet.addr(), &self.config, time); - - let dropped = connection.gather_dropped_packets(); - let mut processed_packets: Vec = dropped - .iter() - .flat_map(|waiting_packet| { - connection.process_outgoing( - &waiting_packet.payload, - // Because a delivery guarantee is only sent with reliable packets - DeliveryGuarantee::Reliable, - // This is stored with the dropped packet because they could be mixed - waiting_packet.ordering_guarantee, - waiting_packet.item_identifier, - time, - ) - }) - .collect(); - - let processed_packet = connection.process_outgoing( - packet.payload(), - packet.delivery_guarantee(), - packet.order_guarantee(), - None, - time, - )?; - - processed_packets.push(processed_packet); - - let mut bytes_sent = 0; + fn send_to(&mut self, event: ConnectionEvent, time: Instant) { + let connection = if let Some(connection) = self.connections.try_get(&event.0) { + Some(connection) + } else { + let conn = self.connections.insert_and_init_connection( + VirtualConnection::new( + event.0, + ReliabilitySystem::new(&self.config, time), + self.conn_manager_factory + .create_local_connection_manager(&event.0), + ), + &mut self.socket, + &self.event_sender, + &mut self.metrics, + time, + &mut self.preprocess_buffer, + ); + Some(conn) + }; - for processed_packet in processed_packets { - if self.should_send_packet() { - match processed_packet { - Outgoing::Packet(outgoing) => { - bytes_sent += self.send_packet(&packet.addr(), &outgoing.contents())?; - } - Outgoing::Fragments(packets) => { - for outgoing in packets { - bytes_sent += self.send_packet(&packet.addr(), &outgoing.contents())?; - } - } + if let Some(connection) = connection { + match (event.1, connection.get_current_state()) { + (SendEvent::Packet(packet), ConnectionState::Connected(_)) => { + // TODO maybe these should not depend on send_to method? + // Maybe it should be extracted to separate method and added to `manual_poll` function. + connection.resend_dropped_packets(time, &mut self.socket, &mut self.metrics); + connection.process_outgoing(&packet, time, &mut self.socket, &mut self.metrics); } - } + (SendEvent::Connect(data), ConnectionState::Connecting) => connection.connect(data), + (SendEvent::Disconnect, _) => connection.disconnect(), + _ => {} // ignore all other combinations + }; } - Ok(bytes_sent) } // On success the packet will be sent on the `event_sender` fn recv_from(&mut self, time: Instant) -> Result { - match self.socket.recv_from(&mut self.recv_buffer) { + match self.socket.socket.recv_from(&mut self.recv_buffer) { Ok((recv_len, address)) => { if recv_len == 0 { - return Err(ErrorKind::ReceivedDataToShort)?; + return Err(ErrorKind::ReceivedDataToShort); } let received_payload = &self.recv_buffer[..recv_len]; - if !self.connections.exists(&address) { - self.event_sender.send(SocketEvent::Connect(address))?; - } - - let connection = - self.connections - .get_or_create_connection(address, &self.config, time); - - match connection { - Left(existing) => { - existing.process_incoming(received_payload, &self.event_sender, time)?; - } - Right(mut anonymous) => { - anonymous.process_incoming(received_payload, &self.event_sender, time)?; - } + let connection = if let Some(conn) = self.connections.try_get(&address) { + Some(conn) + } else { + let conn = self.connections.insert_and_init_connection( + VirtualConnection::new( + address, + ReliabilitySystem::new(&self.config, time), + self.conn_manager_factory + .create_remote_connection_manager(&address, received_payload), + ), + &mut self.socket, + &self.event_sender, + &mut self.metrics, + time, + &mut self.preprocess_buffer, + ); + Some(conn) + }; + + if let Some(conn) = connection { + self.metrics.track_received_bytes(&address, recv_len); + conn.process_incoming( + received_payload, + &mut self.preprocess_buffer, + time, + &self.event_sender, + &mut self.metrics, + )?; + } else { + self.metrics.track_ignored_bytes(&address, recv_len); } } Err(e) => { if e.kind() != io::ErrorKind::WouldBlock { - error!("Encountered an error receiving data: {:?}", e); return Err(e.into()); } else { return Ok(UdpSocketState::MaybeEmpty); @@ -335,22 +349,6 @@ impl Socket { } } - // Send a single packet over the UDP socket. - fn send_packet(&self, addr: &SocketAddr, payload: &[u8]) -> Result { - let bytes_sent = self.socket.send_to(payload, addr)?; - Ok(bytes_sent) - } - - // In the presence of a link conditioner, we would like it to determine whether or not we should - // send a packet. - fn should_send_packet(&mut self) -> bool { - if let Some(link_conditioner) = &mut self.link_conditioner { - link_conditioner.should_send() - } else { - true - } - } - #[cfg(test)] fn connection_count(&self) -> usize { self.connections.count() @@ -439,7 +437,7 @@ mod tests { let time = Instant::now(); - let sender = client.get_packet_sender(); + let sender = client.get_event_sender(); let receiver = server.get_event_receiver(); sender diff --git a/src/net/virtual_connection.rs b/src/net/virtual_connection.rs index a56f5b21..da518be3 100644 --- a/src/net/virtual_connection.rs +++ b/src/net/virtual_connection.rs @@ -1,19 +1,12 @@ use crate::{ - config::Config, - error::{ErrorKind, PacketErrorKind, Result}, - infrastructure::{ - arranging::{Arranging, ArrangingSystem, OrderingSystem, SequencingSystem}, - AcknowledgmentHandler, CongestionHandler, Fragmentation, SentPacket, + error::{ErrorKind, Result}, + net::events::{ConnectionEvent, DestroyReason, DisconnectReason, ReceiveEvent}, + net::managers::{ + ConnectionManager, ConnectionManagerError, ConnectionManagerEvent, ConnectionState, }, - net::constants::{ - ACKED_PACKET_HEADER, DEFAULT_ORDERING_STREAM, DEFAULT_SEQUENCING_STREAM, - STANDARD_HEADER_SIZE, - }, - packet::{ - DeliveryGuarantee, OrderingGuarantee, Outgoing, OutgoingPacket, OutgoingPacketBuilder, - Packet, PacketReader, PacketType, SequenceNumber, - }, - SocketEvent, + net::{MetricsCollector, SocketWithConditioner}, + net::{OutgoingPackets, ReliabilitySystem}, + packet::{GenericPacket, Packet, PacketType}, }; use crossbeam_channel::{self, Sender}; @@ -22,408 +15,256 @@ use std::net::SocketAddr; use std::time::{Duration, Instant}; /// Contains the information about a certain 'virtual connection' over udp. -/// This connections also keeps track of network quality, processing packets, buffering data related to connection etc. +/// This connections has core components that manages connection state, and has reliability information. pub struct VirtualConnection { - /// Last time we received a packet from this client - pub last_heard: Instant, - /// Last time we sent a packet to this client - pub last_sent: Instant, - /// The address of the remote endpoint - pub remote_address: SocketAddr, - - ordering_system: OrderingSystem>, - sequencing_system: SequencingSystem>, - acknowledge_handler: AcknowledgmentHandler, - congestion_handler: CongestionHandler, - - config: Config, - fragmentation: Fragmentation, + remote_address: SocketAddr, + reliability_system: ReliabilitySystem, + connection_manager: Box, + current_state: ConnectionState, } impl VirtualConnection { - /// Creates and returns a new Connection that wraps the provided socket address - pub fn new(addr: SocketAddr, config: &Config, time: Instant) -> VirtualConnection { - VirtualConnection { - last_heard: time, - last_sent: time, - remote_address: addr, - ordering_system: OrderingSystem::new(), - sequencing_system: SequencingSystem::new(), - acknowledge_handler: AcknowledgmentHandler::new(), - congestion_handler: CongestionHandler::new(config), - fragmentation: Fragmentation::new(config), - config: config.to_owned(), + /// Creates new VirtualConnection. + pub fn new( + remote_address: SocketAddr, + reliability_system: ReliabilitySystem, + connection_manager: Box, + ) -> VirtualConnection { + Self { + remote_address, + reliability_system, + connection_manager, + current_state: Default::default(), } } - /// Determine if this connection should be dropped due to its state - pub fn should_be_dropped(&self) -> bool { - self.acknowledge_handler.packets_in_flight() > self.config.max_packets_in_flight + /// Returns connection remote address. + pub fn remote_address(&self) -> SocketAddr { + self.remote_address } - /// Returns a [Duration] representing the interval since we last heard from the client - pub fn last_heard(&self, time: Instant) -> Duration { - // TODO: Replace with saturating_duration_since once it becomes stable. - // This function panics if the user supplies a time instant earlier than last_heard. - time.duration_since(self.last_heard) + /// Invokes connect request on ConnectionManager. + pub fn connect(&mut self, payload: Box<[u8]>) { + self.connection_manager.connect(payload); } - /// Returns a [Duration] representing the interval since we last sent to the client - pub fn last_sent(&self, time: Instant) -> Duration { - // TODO: Replace with saturating_duration_since once it becomes stable. - // This function panics if the user supplies a time instant earlier than last_heard. - time.duration_since(self.last_sent) + /// Invokes disconnect request on ConnectionManager. + pub fn disconnect(&mut self) { + self.connection_manager.disconnect(); } - /// This will create a heartbeat packet that is expected to be sent over the network - pub fn create_and_process_heartbeat(&mut self, time: Instant) -> OutgoingPacket<'static> { - self.last_sent = time; - self.congestion_handler - .process_outgoing(self.acknowledge_handler.local_sequence_num(), time); + /// Returns current connection state: `Connecting`, `Connected` or `Disconnected`. + pub fn get_current_state(&self) -> &ConnectionState { + &self.current_state + } - OutgoingPacketBuilder::new(&[]) - .with_default_header( - PacketType::Heartbeat, - DeliveryGuarantee::Unreliable, - OrderingGuarantee::None, - ) - .build() + /// Returns whether connection should be dropped, and provides a drop reason. + pub fn should_be_dropped( + &self, + max_idle_time: Duration, + time: Instant, + ) -> Option { + if self.reliability_system.should_be_dropped() { + Some(DestroyReason::TooManyPacketsInFlight) + } else if let ConnectionState::Disconnected(_) = self.current_state { + Some(DestroyReason::GracefullyDisconnected) + } else if self.reliability_system.last_heard(time) >= max_idle_time { + Some(DestroyReason::Timeout) + } else { + None + } } - /// This will pre-process the given buffer to be sent over the network. - pub fn process_outgoing<'a>( + /// Checks if connection should send heartbeat packet and sends it. + pub fn handle_heartbeat( &mut self, - payload: &'a [u8], - delivery_guarantee: DeliveryGuarantee, - ordering_guarantee: OrderingGuarantee, - last_item_identifier: Option, time: Instant, - ) -> Result> { - match delivery_guarantee { - DeliveryGuarantee::Unreliable => { - if payload.len() <= self.config.receive_buffer_max_size { - let mut builder = OutgoingPacketBuilder::new(payload).with_default_header( - PacketType::Packet, - delivery_guarantee, - ordering_guarantee, - ); - - if let OrderingGuarantee::Sequenced(stream_id) = ordering_guarantee { - let item_identifier = self - .sequencing_system - .get_or_create_stream(stream_id.unwrap_or(DEFAULT_SEQUENCING_STREAM)) - .new_item_identifier(); + heartbeat_interval: Duration, + socket: &mut SocketWithConditioner, + metrics: &mut MetricsCollector, + ) { + if self.reliability_system.last_sent(time) >= heartbeat_interval { + send_packets( + &self.remote_address, + self.reliability_system + .process_outgoing(GenericPacket::heartbeat_packet(&[]), time), + self.connection_manager.as_mut(), + socket, + metrics, + "sending heartbeat packet", + ); + } + } - builder = builder.with_sequencing_header(item_identifier as u16, stream_id); - }; + /// Resends all dropped packets. + pub fn resend_dropped_packets( + &mut self, + time: Instant, + socket: &mut SocketWithConditioner, + metrics: &mut MetricsCollector, + ) { + for dropped in self.reliability_system.gather_dropped_packets() { + send_packets( + &self.remote_address, + self.reliability_system.process_dropped(&dropped, time), + self.connection_manager.as_mut(), + socket, + metrics, + "sending dropped packet", + ); + } + } - Ok(Outgoing::Packet(builder.build())) - } else { - Err(ErrorKind::PacketError( - PacketErrorKind::ExceededMaxPacketSize, - )) - } - } - DeliveryGuarantee::Reliable => { - let payload_length = payload.len() as u16; + /// Processes outgoing packet, and sends it. + pub fn process_outgoing( + &mut self, + packet: &Packet, + time: Instant, + socket: &mut SocketWithConditioner, + metrics: &mut MetricsCollector, + ) { + let packets = self.reliability_system.process_outgoing( + GenericPacket { + packet_type: PacketType::Packet, + payload: packet.payload(), + delivery: packet.delivery_guarantee(), + ordering: packet.order_guarantee(), + }, + time, + ); - let mut item_identifier_value = None; - let outgoing = { - // spit the packet if the payload length is greater than the allowed fragment size. - if payload_length <= self.config.fragment_size { - let mut builder = OutgoingPacketBuilder::new(payload).with_default_header( - PacketType::Packet, - delivery_guarantee, - ordering_guarantee, - ); + send_packets( + &self.remote_address, + packets, + self.connection_manager.as_mut(), + socket, + metrics, + "sending outgoing packet", + ); + } - builder = builder.with_acknowledgment_header( - self.acknowledge_handler.local_sequence_num(), - self.acknowledge_handler.remote_sequence_num(), - self.acknowledge_handler.ack_bitfield(), + /// Processes incoming bytes by converting to packets and process them. + pub fn process_incoming( + &mut self, + received_payload: &[u8], + mut tmp_buffer: &mut [u8], + time: Instant, + event_sender: &Sender>, + metrics: &mut MetricsCollector, + ) -> Result<()> { + let received_payload = self + .connection_manager + .preprocess_incoming(received_payload, &mut tmp_buffer)?; + let packets = self.reliability_system.process_incoming( + self.remote_address, + received_payload, + time, + )?; + for (packet, packet_type) in packets { + if packet_type != PacketType::Connection { + if let ConnectionState::Connected(_) = self.current_state { + if let Err(err) = event_sender.send(ConnectionEvent( + self.remote_address, + ReceiveEvent::Packet(packet), + )) { + metrics.track_connection_error( + &self.remote_address, + &ErrorKind::from(err), + "sending incoming packet", ); - - if let OrderingGuarantee::Ordered(stream_id) = ordering_guarantee { - let item_identifier = - if let Some(item_identifier) = last_item_identifier { - item_identifier - } else { - self.ordering_system - .get_or_create_stream( - stream_id.unwrap_or(DEFAULT_ORDERING_STREAM), - ) - .new_item_identifier() - }; - - item_identifier_value = Some(item_identifier); - - builder = builder.with_ordering_header(item_identifier, stream_id); - }; - - if let OrderingGuarantee::Sequenced(stream_id) = ordering_guarantee { - let item_identifier = - if let Some(item_identifier) = last_item_identifier { - item_identifier - } else { - self.sequencing_system - .get_or_create_stream( - stream_id.unwrap_or(DEFAULT_SEQUENCING_STREAM), - ) - .new_item_identifier() - }; - - item_identifier_value = Some(item_identifier); - - builder = builder.with_sequencing_header(item_identifier, stream_id); - }; - - Outgoing::Packet(builder.build()) - } else { - Outgoing::Fragments( - Fragmentation::spit_into_fragments(payload, &self.config)? - .into_iter() - .enumerate() - .map(|(fragment_id, fragment)| { - let fragments_needed = Fragmentation::fragments_needed( - payload_length, - self.config.fragment_size, - ) - as u8; - - let mut builder = OutgoingPacketBuilder::new(fragment) - .with_default_header( - PacketType::Fragment, - delivery_guarantee, - ordering_guarantee, - ); - - builder = builder.with_fragment_header( - self.acknowledge_handler.local_sequence_num(), - fragment_id as u8, - fragments_needed, - ); - - if fragment_id == 0 { - builder = builder.with_acknowledgment_header( - self.acknowledge_handler.local_sequence_num(), - self.acknowledge_handler.remote_sequence_num(), - self.acknowledge_handler.ack_bitfield(), - ); - } - - builder.build() - }) - .collect(), - ) } - }; - - self.last_sent = time; - self.congestion_handler - .process_outgoing(self.acknowledge_handler.local_sequence_num(), time); - self.acknowledge_handler.process_outgoing( - payload, - ordering_guarantee, - item_identifier_value, + } + } else if let Err(err) = self + .connection_manager + .process_protocol_data(packet.payload()) + { + metrics.track_connection_error( + &self.remote_address, + &ErrorKind::from(err), + "processing connection manager data", ); - - Ok(outgoing) } } + Ok(()) } - /// This processes the incoming data and returns a packet if the data is complete. - pub fn process_incoming( + /// Calls `update` method for `ConnectionManager`, in the loop, until it returns None + /// These updates returns either new packets to be sent, or connection state changes. + pub fn update_connection_manager( &mut self, - received_data: &[u8], - sender: &Sender, + sender: &Sender>, + metrics: &mut MetricsCollector, + socket: &mut SocketWithConditioner, time: Instant, - ) -> crate::Result<()> { - self.last_heard = time; - - let mut packet_reader = PacketReader::new(received_data); - - let header = packet_reader.read_standard_header()?; - - if !header.is_current_protocol() { - return Err(ErrorKind::ProtocolVersionMismatch); - } - - if header.is_heartbeat() { - // Heartbeat packets are unreliable, unordered and empty packets. - // We already updated our `self.last_heard` time, nothing else to be done. - return Ok(()); - } - - match header.delivery_guarantee() { - DeliveryGuarantee::Unreliable => { - if let OrderingGuarantee::Sequenced(_id) = header.ordering_guarantee() { - let arranging_header = - packet_reader.read_arranging_header(u16::from(STANDARD_HEADER_SIZE))?; - - let payload = packet_reader.read_payload(); - - let stream = self - .sequencing_system - .get_or_create_stream(arranging_header.stream_id()); - - if let Some(packet) = stream.arrange(arranging_header.arranging_id(), payload) { - Self::queue_packet( - sender, - packet, - self.remote_address, - header.delivery_guarantee(), - OrderingGuarantee::Sequenced(Some(arranging_header.stream_id())), - )?; - } - - return Ok(()); + buffer: &mut [u8], + ) { + while let Some(changes) = self.connection_manager.update(buffer, time) { + match changes { + ConnectionManagerEvent::NewPacket(packet) => { + send_packets( + &self.remote_address, + self.reliability_system.process_outgoing(packet, time), + self.connection_manager.as_mut(), + socket, + metrics, + "sending packet from connection manager", + ); } - - Self::queue_packet( - sender, - packet_reader.read_payload(), - self.remote_address, - header.delivery_guarantee(), - header.ordering_guarantee(), - )?; - } - DeliveryGuarantee::Reliable => { - if header.is_fragment() { - if let Ok((fragment_header, acked_header)) = packet_reader.read_fragment() { - let payload = packet_reader.read_payload(); - - match self - .fragmentation - .handle_fragment(fragment_header, &payload) - { - Ok(Some(payload)) => { - Self::queue_packet( - sender, - payload.into_boxed_slice(), - self.remote_address, - header.delivery_guarantee(), - OrderingGuarantee::None, - )?; - } - Ok(None) => return Ok(()), - Err(e) => return Err(e), - }; - - if let Some(acked_header) = acked_header { - self.congestion_handler - .process_incoming(acked_header.sequence()); - self.acknowledge_handler.process_incoming( - acked_header.sequence(), - acked_header.ack_seq(), - acked_header.ack_field(), - ); - } - } - } else { - let acked_header = packet_reader.read_acknowledge_header()?; - - if let OrderingGuarantee::Sequenced(_) = header.ordering_guarantee() { - let arranging_header = packet_reader.read_arranging_header(u16::from( - STANDARD_HEADER_SIZE + ACKED_PACKET_HEADER, - ))?; - - let payload = packet_reader.read_payload(); - - let stream = self - .sequencing_system - .get_or_create_stream(arranging_header.stream_id()); - - if let Some(packet) = - stream.arrange(arranging_header.arranging_id(), payload) - { - Self::queue_packet( - sender, - packet, - self.remote_address, - header.delivery_guarantee(), - OrderingGuarantee::Sequenced(Some(arranging_header.stream_id())), - )?; - } - } else if let OrderingGuarantee::Ordered(_id) = header.ordering_guarantee() { - let arranging_header = packet_reader.read_arranging_header(u16::from( - STANDARD_HEADER_SIZE + ACKED_PACKET_HEADER, - ))?; - - let payload = packet_reader.read_payload(); - - let stream = self - .ordering_system - .get_or_create_stream(arranging_header.stream_id()); - - if let Some(packet) = - stream.arrange(arranging_header.arranging_id(), payload) - { - Self::queue_packet( - sender, - packet, + ConnectionManagerEvent::NewState(state) => { + if let Some(old) = self.current_state.try_change(&state) { + if let Err(err) = match &self.current_state { + ConnectionState::Connected(data) => sender.send(ConnectionEvent( self.remote_address, - header.delivery_guarantee(), - OrderingGuarantee::Ordered(Some(arranging_header.stream_id())), - )?; - - while let Some(packet) = stream.iter_mut().next() { - Self::queue_packet( - sender, - packet, + ReceiveEvent::Connected(data.clone()), + )), + ConnectionState::Disconnected(closed_by) => { + sender.send(ConnectionEvent( self.remote_address, - header.delivery_guarantee(), - OrderingGuarantee::Ordered(Some(arranging_header.stream_id())), - )?; + ReceiveEvent::Disconnected(DisconnectReason::ClosedBy( + closed_by.clone(), + )), + )) + } + _ => { + metrics.track_connection_error( + &self.remote_address, + &ErrorKind::ConnectionError(ConnectionManagerError::Fatal( + format!( + "Invalid state transition: {:?} -> {:?}", + old, self.current_state + ), + )), + "changing connection manager state", + ); + Ok(()) } + } { + metrics.track_connection_error( + &self.remote_address, + &ErrorKind::from(err), + "sending connection state update", + ); } } else { - let payload = packet_reader.read_payload(); - - Self::queue_packet( - sender, - payload, - self.remote_address, - header.delivery_guarantee(), - header.ordering_guarantee(), - )?; + metrics.track_connection_error( + &self.remote_address, + &ErrorKind::ConnectionError(ConnectionManagerError::Fatal(format!( + "Invalid state transition: {:?} -> {:?}", + self.current_state, state + ))), + "changing connection manager state", + ); } - - self.congestion_handler - .process_incoming(acked_header.sequence()); - self.acknowledge_handler.process_incoming( - acked_header.sequence(), - acked_header.ack_seq(), - acked_header.ack_field(), + } + ConnectionManagerEvent::Error(err) => { + metrics.track_connection_error( + &self.remote_address, + &ErrorKind::ConnectionError(err), + "recieved connection manager error", ); } - } + }; } - - Ok(()) - } - - fn queue_packet( - tx: &Sender, - payload: Box<[u8]>, - remote_addr: SocketAddr, - delivery: DeliveryGuarantee, - ordering: OrderingGuarantee, - ) -> Result<()> { - tx.send(SocketEvent::Packet(Packet::new( - remote_addr, - payload, - delivery, - ordering, - )))?; - Ok(()) - } - - /// This will gather dropped packets from the acknowledgment handler. - /// - /// Note that after requesting dropped packets the dropped packets will be removed from this client. - pub fn gather_dropped_packets(&mut self) -> Vec { - self.acknowledge_handler.dropped_packets() } } @@ -438,465 +279,27 @@ impl fmt::Debug for VirtualConnection { } } -#[cfg(test)] -mod tests { - use super::VirtualConnection; - use crate::config::Config; - use crate::net::constants; - use crate::packet::header::{AckedPacketHeader, ArrangingHeader, HeaderWriter, StandardHeader}; - use crate::packet::{DeliveryGuarantee, OrderingGuarantee, Outgoing, Packet, PacketType}; - use crate::protocol_version::ProtocolVersion; - use crate::SocketEvent; - use byteorder::{BigEndian, WriteBytesExt}; - use crossbeam_channel::{unbounded, TryRecvError}; - use std::io::Write; - use std::time::Instant; - - const PAYLOAD: [u8; 4] = [1, 2, 3, 4]; - - #[test] - fn assure_right_fragmentation() { - let mut protocol_version = Vec::new(); - protocol_version - .write_u16::(ProtocolVersion::get_crc16()) - .unwrap(); - - let standard_header = [protocol_version, vec![1, 1, 2]].concat(); - - let acked_header = vec![1, 0, 0, 2, 0, 0, 0, 3]; - let first_fragment = vec![0, 1, 1, 3]; - let second_fragment = vec![0, 1, 2, 3]; - let third_fragment = vec![0, 1, 3, 3]; - - let (tx, rx) = unbounded::(); - - let mut connection = create_virtual_connection(); - connection - .process_incoming( - [standard_header.as_slice(), acked_header.as_slice()] - .concat() - .as_slice(), - &tx, - Instant::now(), - ) - .unwrap(); - assert!(rx.try_recv().is_err()); - connection - .process_incoming( - [ - standard_header.as_slice(), - first_fragment.as_slice(), - &PAYLOAD, - ] - .concat() - .as_slice(), - &tx, - Instant::now(), - ) - .unwrap(); - assert!(rx.try_recv().is_err()); - connection - .process_incoming( - [ - standard_header.as_slice(), - second_fragment.as_slice(), - &PAYLOAD, - ] - .concat() - .as_slice(), - &tx, - Instant::now(), - ) - .unwrap(); - assert!(rx.try_recv().is_err()); - connection - .process_incoming( - [ - standard_header.as_slice(), - third_fragment.as_slice(), - &PAYLOAD, - ] - .concat() - .as_slice(), - &tx, - Instant::now(), - ) - .unwrap(); - - let complete_fragment = rx.try_recv().unwrap(); - - match complete_fragment { - SocketEvent::Packet(fragment) => assert_eq!( - fragment.payload(), - &*[PAYLOAD, PAYLOAD, PAYLOAD].concat().into_boxed_slice() - ), - _ => { - panic!("Expected fragment other result."); - } - } - } - - #[test] - fn expect_fragmentation() { - let mut connection = create_virtual_connection(); - - let buffer = vec![1; 4000]; - - let outgoing = connection - .process_outgoing( - &buffer, - DeliveryGuarantee::Reliable, - OrderingGuarantee::Ordered(None), - None, - Instant::now(), - ) - .unwrap(); - - match outgoing { - Outgoing::Packet(_) => panic!("Expected fragment got packet"), - Outgoing::Fragments(fragments) => { - assert_eq!(fragments.len(), 4); - } - } - } - - #[test] - fn assure_correct_outgoing_processing() { - let mut connection = create_virtual_connection(); - - let buffer = vec![1; 1000]; - - connection - .process_outgoing( - &buffer, - DeliveryGuarantee::Unreliable, - OrderingGuarantee::None, - None, - Instant::now(), - ) - .unwrap(); - - connection - .process_outgoing( - &buffer, - DeliveryGuarantee::Unreliable, - OrderingGuarantee::Sequenced(None), - None, - Instant::now(), - ) - .unwrap(); - - connection - .process_outgoing( - &buffer, - DeliveryGuarantee::Reliable, - OrderingGuarantee::Ordered(None), - None, - Instant::now(), - ) - .unwrap(); - - connection - .process_outgoing( - &buffer, - DeliveryGuarantee::Reliable, - OrderingGuarantee::Sequenced(None), - None, - Instant::now(), - ) - .unwrap(); - } - - #[test] - fn assure_right_sequencing() { - let mut connection = create_virtual_connection(); - - assert_incoming_with_order( - DeliveryGuarantee::Unreliable, - OrderingGuarantee::Sequenced(Some(1)), - &mut connection, - Ok(SocketEvent::Packet(Packet::unreliable_sequenced( - get_fake_addr(), - PAYLOAD.to_vec(), - Some(1), - ))), - 1, - ); - - assert_incoming_with_order( - DeliveryGuarantee::Unreliable, - OrderingGuarantee::Sequenced(Some(1)), - &mut connection, - Ok(SocketEvent::Packet(Packet::unreliable_sequenced( - get_fake_addr(), - PAYLOAD.to_vec(), - Some(1), - ))), - 3, - ); - - assert_incoming_with_order( - DeliveryGuarantee::Unreliable, - OrderingGuarantee::Sequenced(Some(1)), - &mut connection, - Err(TryRecvError::Empty), - 2, - ); - - assert_incoming_with_order( - DeliveryGuarantee::Unreliable, - OrderingGuarantee::Sequenced(Some(1)), - &mut connection, - Ok(SocketEvent::Packet(Packet::unreliable_sequenced( - get_fake_addr(), - PAYLOAD.to_vec(), - Some(1), - ))), - 4, - ); - - assert_incoming_with_order( - DeliveryGuarantee::Reliable, - OrderingGuarantee::Sequenced(Some(1)), - &mut connection, - Ok(SocketEvent::Packet(Packet::reliable_sequenced( - get_fake_addr(), - PAYLOAD.to_vec(), - Some(1), - ))), - 5, - ); - } - - #[test] - fn assure_right_ordering() { - let mut connection = create_virtual_connection(); - - assert_incoming_with_order( - DeliveryGuarantee::Reliable, - OrderingGuarantee::Ordered(Some(1)), - &mut connection, - Ok(SocketEvent::Packet(Packet::reliable_ordered( - get_fake_addr(), - PAYLOAD.to_vec(), - Some(1), - ))), - 0, - ); - - assert_incoming_with_order( - DeliveryGuarantee::Reliable, - OrderingGuarantee::Ordered(Some(1)), - &mut connection, - Err(TryRecvError::Empty), - 2, - ); - - assert_incoming_with_order( - DeliveryGuarantee::Reliable, - OrderingGuarantee::Ordered(Some(1)), - &mut connection, - Err(TryRecvError::Empty), - 3, - ); - - assert_incoming_with_order( - DeliveryGuarantee::Reliable, - OrderingGuarantee::Ordered(Some(1)), - &mut connection, - Ok(SocketEvent::Packet(Packet::reliable_ordered( - get_fake_addr(), - PAYLOAD.to_vec(), - Some(1), - ))), - 1, - ); - } - - #[test] - fn assure_correct_processing_of_incoming() { - let mut connection = create_virtual_connection(); - - assert_incoming_without_order( - DeliveryGuarantee::Unreliable, - &mut connection, - SocketEvent::Packet(Packet::unreliable(get_fake_addr(), PAYLOAD.to_vec())), - ); - - assert_incoming_without_order( - DeliveryGuarantee::Reliable, - &mut connection, - SocketEvent::Packet(Packet::reliable_unordered( - get_fake_addr(), - PAYLOAD.to_vec(), - )), - ); - - assert_incoming_with_order( - DeliveryGuarantee::Unreliable, - OrderingGuarantee::Sequenced(Some(1)), - &mut connection, - Ok(SocketEvent::Packet(Packet::unreliable_sequenced( - get_fake_addr(), - PAYLOAD.to_vec(), - Some(1), - ))), - 1, - ); - - assert_incoming_with_order( - DeliveryGuarantee::Reliable, - OrderingGuarantee::Ordered(Some(1)), - &mut connection, - Ok(SocketEvent::Packet(Packet::reliable_ordered( - get_fake_addr(), - PAYLOAD.to_vec(), - Some(1), - ))), - 0, - ); - } - - #[test] - fn assure_right_header_size() { - assert_right_header_size( - DeliveryGuarantee::Unreliable, - OrderingGuarantee::None, - (constants::STANDARD_HEADER_SIZE) as usize, - ); - assert_right_header_size( - DeliveryGuarantee::Unreliable, - OrderingGuarantee::Sequenced(None), - (constants::STANDARD_HEADER_SIZE + constants::ARRANGING_PACKET_HEADER) as usize, - ); - assert_right_header_size( - DeliveryGuarantee::Reliable, - OrderingGuarantee::None, - (constants::STANDARD_HEADER_SIZE + constants::ACKED_PACKET_HEADER) as usize, - ); - assert_right_header_size( - DeliveryGuarantee::Reliable, - OrderingGuarantee::Ordered(None), - (constants::STANDARD_HEADER_SIZE - + constants::ACKED_PACKET_HEADER - + constants::ARRANGING_PACKET_HEADER) as usize, - ); - } - - /// ======= helper functions ========= - fn create_virtual_connection() -> VirtualConnection { - VirtualConnection::new(get_fake_addr(), &Config::default(), Instant::now()) - } - - fn get_fake_addr() -> std::net::SocketAddr { - "127.0.0.1:0".parse().unwrap() - } - - // assert that the processing of the given `DeliveryGuarantee` and `OrderingGuarantee` results into the given `result_event` - fn assert_incoming_with_order( - delivery: DeliveryGuarantee, - ordering: OrderingGuarantee, - connection: &mut VirtualConnection, - result_event: Result, - order_id: u16, - ) { - let mut packet = Vec::new(); - - // configure the right header based on specified guarantees. - let header = StandardHeader::new(delivery, ordering, PacketType::Packet); - header.parse(&mut packet).unwrap(); - - if let OrderingGuarantee::Sequenced(val) = ordering { - if delivery == DeliveryGuarantee::Reliable { - let ack_header = AckedPacketHeader::new(1, 2, 3); - ack_header.parse(&mut packet).unwrap(); - } - - let order_header = ArrangingHeader::new(order_id, val.unwrap()); - order_header.parse(&mut packet).unwrap(); - } - - if let OrderingGuarantee::Ordered(val) = ordering { - if delivery == DeliveryGuarantee::Reliable { - let ack_header = AckedPacketHeader::new(1, 2, 3); - let order_header = ArrangingHeader::new(order_id, val.unwrap()); - ack_header.parse(&mut packet).unwrap(); - order_header.parse(&mut packet).unwrap(); - } - } - - if let OrderingGuarantee::None = ordering { - if delivery == DeliveryGuarantee::Reliable { - let ack_header = AckedPacketHeader::new(1, 2, 3); - ack_header.parse(&mut packet).unwrap(); - } - } - - packet.write_all(&PAYLOAD).unwrap(); - - let (tx, rx) = unbounded::(); - - connection - .process_incoming(packet.as_slice(), &tx, Instant::now()) - .unwrap(); - - let event = rx.try_recv(); - - match event { - Ok(val) => assert_eq!(val, result_event.unwrap()), - Err(e) => assert_eq!(e, result_event.err().unwrap()), - } - } - - // assert that the given `DeliveryGuarantee` results into the given `SocketEvent` after processing. - fn assert_incoming_without_order( - delivery: DeliveryGuarantee, - connection: &mut VirtualConnection, - result_event: SocketEvent, - ) { - let mut packet = Vec::new(); - - // configure the right header based on specified guarantees. - let header = StandardHeader::new(delivery, OrderingGuarantee::None, PacketType::Packet); - header.parse(&mut packet).unwrap(); - - if delivery == DeliveryGuarantee::Reliable { - let ack_header = AckedPacketHeader::new(1, 2, 3); - ack_header.parse(&mut packet).unwrap(); - } - - packet.write_all(&PAYLOAD).unwrap(); - - let (tx, rx) = unbounded::(); - - connection - .process_incoming(packet.as_slice(), &tx, Instant::now()) - .unwrap(); - - let event = rx.try_recv(); - - assert_eq!(event, Ok(result_event)); - } - - // assert that the size of the processed header is the same as the given one. - fn assert_right_header_size( - delivery: DeliveryGuarantee, - ordering: OrderingGuarantee, - expected_header_size: usize, - ) { - let mut connection = create_virtual_connection(); - - let buffer = vec![1; 500]; - - let outgoing = connection - .process_outgoing(&buffer, delivery, ordering, None, Instant::now()) - .unwrap(); - - match outgoing { - Outgoing::Packet(packet) => { - assert_eq!(packet.contents().len() - buffer.len(), expected_header_size); +// Helper method, that takes outgoing packets from reliability system and sends them. +fn send_packets( + address: &SocketAddr, + packets_result: Result, + connection_manager: &mut dyn ConnectionManager, + socket: &mut SocketWithConditioner, + metrics: &mut MetricsCollector, + err_context: &str, +) { + match packets_result { + Ok(packets) => { + for outgoing in packets { + socket.send_packet_and_log( + address, + connection_manager, + &outgoing.contents(), + metrics, + err_context, + ); } - Outgoing::Fragments(_) => panic!("Expected packet got fragment"), } + Err(err) => metrics.track_connection_error(address, &err, err_context), } } diff --git a/src/packet.rs b/src/packet.rs index 9c5ad403..525b1a1e 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -8,9 +8,9 @@ mod packet_reader; mod packet_structure; pub use self::enums::{DeliveryGuarantee, OrderingGuarantee, PacketType}; -pub use self::outgoing::{Outgoing, OutgoingPacket, OutgoingPacketBuilder}; +pub use self::outgoing::{OutgoingPacket, OutgoingPacketBuilder}; pub use self::packet_reader::PacketReader; -pub use self::packet_structure::Packet; +pub use self::packet_structure::{GenericPacket, Packet}; pub type SequenceNumber = u16; diff --git a/src/packet/enums.rs b/src/packet/enums.rs index 454f3634..8deda140 100644 --- a/src/packet/enums.rs +++ b/src/packet/enums.rs @@ -84,12 +84,14 @@ impl TryFrom for OrderingGuarantee { #[derive(Copy, Clone, Debug, PartialOrd, PartialEq)] /// Id to identify a certain packet type. pub enum PacketType { - /// Full packet that is not fragmented + /// User full packet that is not fragmented Packet = 0, - /// Fragment of a full packet + /// User fragment of a full packet Fragment = 1, /// Heartbeat packet Heartbeat = 2, + /// Connection manager specific packet + Connection = 3, } impl EnumConverter for PacketType { @@ -107,6 +109,7 @@ impl TryFrom for PacketType { 0 => Ok(PacketType::Packet), 1 => Ok(PacketType::Fragment), 2 => Ok(PacketType::Heartbeat), + 3 => Ok(PacketType::Connection), _ => Err(ErrorKind::DecodingError(DecodingErrorKind::PacketType)), } } diff --git a/src/packet/header/standard_header.rs b/src/packet/header/standard_header.rs index 4eccaafe..85ec7934 100644 --- a/src/packet/header/standard_header.rs +++ b/src/packet/header/standard_header.rs @@ -48,21 +48,10 @@ impl StandardHeader { } /// Returns the PacketType - #[cfg(test)] pub fn packet_type(&self) -> PacketType { self.packet_type } - /// Returns true if the packet is a heartbeat packet, false otherwise - pub fn is_heartbeat(&self) -> bool { - self.packet_type == PacketType::Heartbeat - } - - /// Returns true if the packet is a fragment, false if not - pub fn is_fragment(&self) -> bool { - self.packet_type == PacketType::Fragment - } - /// Checks if the protocol version in the packet is a valid version pub fn is_current_protocol(&self) -> bool { ProtocolVersion::valid_version(self.protocol_version) diff --git a/src/packet/outgoing.rs b/src/packet/outgoing.rs index 94f4499d..9af8e3f2 100644 --- a/src/packet/outgoing.rs +++ b/src/packet/outgoing.rs @@ -122,14 +122,6 @@ impl<'p> OutgoingPacket<'p> { } } -/// Enum for storing different kinds of outgoing types with data. -pub enum Outgoing<'a> { - /// Represents a single packet. - Packet(OutgoingPacket<'a>), - /// Represents a packet that is fragmented and thus contains more than one `OutgoingPacket`. - Fragments(Vec>), -} - #[cfg(test)] mod tests { use crate::packet::PacketType; diff --git a/src/packet/packet_structure.rs b/src/packet/packet_structure.rs index 9dee0ba8..695a97df 100644 --- a/src/packet/packet_structure.rs +++ b/src/packet/packet_structure.rs @@ -1,4 +1,4 @@ -use crate::packet::{DeliveryGuarantee, OrderingGuarantee}; +use crate::packet::{DeliveryGuarantee, OrderingGuarantee, PacketType}; use std::net::SocketAddr; #[derive(Clone, PartialEq, Eq, Debug)] @@ -176,6 +176,57 @@ impl Packet { } } +/// This packet type have similar properties to `Packet` except that it doesn't own anything, and additionally has `PacketType`. +#[derive(Debug)] +pub struct GenericPacket<'a> { + /// defines packet type, currently `Fragment` type is not supported + pub(crate) packet_type: PacketType, + /// the raw payload of the packet + pub(crate) payload: &'a [u8], + /// defines on how the packet will be delivered. + pub(crate) delivery: DeliveryGuarantee, + /// defines on how the packet will be ordered. + pub(crate) ordering: OrderingGuarantee, +} + +impl<'a> GenericPacket<'a> { + /// Creates a connection manager specific packet + pub fn connection_packet( + payload: &'a [u8], + delivery: DeliveryGuarantee, + ordering: OrderingGuarantee, + ) -> Self { + Self { + packet_type: PacketType::Connection, + payload, + delivery, + ordering, + } + } + /// Creates a packet that can be received by the user. + pub fn user_packet( + payload: &'a [u8], + delivery: DeliveryGuarantee, + ordering: OrderingGuarantee, + ) -> Self { + Self { + packet_type: PacketType::Packet, + payload, + delivery, + ordering, + } + } + /// Creates a heart beat packet. + pub fn heartbeat_packet(payload: &'a [u8]) -> Self { + Self { + packet_type: PacketType::Heartbeat, + payload, + delivery: DeliveryGuarantee::Unreliable, + ordering: OrderingGuarantee::None, + } + } +} + #[cfg(test)] mod tests { use crate::packet::{DeliveryGuarantee, OrderingGuarantee, Packet};