-
Notifications
You must be signed in to change notification settings - Fork 68
Codebase improvements. #258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
TimonPost
merged 6 commits into
TimonPost:master
from
fraillt:better_interfaces_separation
Oct 10, 2019
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
7598af4
SocketController, ConnectionController and Utilities for unit tests
fraillt 3e07d88
Suggestions from code review. ROUND1
fraillt 441732c
link conditioner behind feature=tester
fraillt c828ff0
bugfix in test utils, and more integration tests
fraillt 2110b46
Addressing some issues from code review: ROUND3
fraillt 024690b
new Connection trait and connection manager
fraillt File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| #[derive(Debug)] | ||
| pub(crate) enum Either<L, R> { | ||
| Left(L), | ||
| Right(R), | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,187 +1,73 @@ | ||
| pub use crate::net::{NetworkQuality, RttMeasurer, VirtualConnection}; | ||
|
|
||
| use crate::config::Config; | ||
| use crate::either::Either::{self, Left, Right}; | ||
| use std::{ | ||
| collections::HashMap, | ||
| net::SocketAddr, | ||
| time::{Duration, Instant}, | ||
| }; | ||
|
|
||
| /// Maintains a registry of active "connections". Essentially, when we receive a packet on the | ||
| /// socket from a particular `SocketAddr`, we will track information about it here. | ||
| #[derive(Debug)] | ||
| pub struct ActiveConnections { | ||
| connections: HashMap<SocketAddr, VirtualConnection>, | ||
| } | ||
|
|
||
| impl ActiveConnections { | ||
| pub fn new() -> Self { | ||
| Self { | ||
| connections: HashMap::new(), | ||
| } | ||
| } | ||
|
|
||
| /// Try to get a `VirtualConnection` by address. If the connection does not exist, it will be | ||
| /// inserted and returned. | ||
| pub fn get_or_insert_connection( | ||
| &mut self, | ||
| address: SocketAddr, | ||
| config: &Config, | ||
| time: Instant, | ||
| ) -> &mut VirtualConnection { | ||
| self.connections | ||
| .entry(address) | ||
| .or_insert_with(|| VirtualConnection::new(address, config, time)) | ||
| } | ||
|
|
||
| /// Try to get or create a [VirtualConnection] by address. If the connection does not exist, it will be | ||
| /// created and returned, but not inserted into the table of active connections. | ||
| pub(crate) fn get_or_create_connection( | ||
| &mut self, | ||
| address: SocketAddr, | ||
| config: &Config, | ||
| time: Instant, | ||
| ) -> Either<&mut VirtualConnection, VirtualConnection> { | ||
| if let Some(connection) = self.connections.get_mut(&address) { | ||
| Left(connection) | ||
| } else { | ||
| Right(VirtualConnection::new(address, config, time)) | ||
| } | ||
| } | ||
|
|
||
| /// Removes the connection from `ActiveConnections` by socket address. | ||
| pub fn remove_connection( | ||
| &mut self, | ||
| address: &SocketAddr, | ||
| ) -> Option<(SocketAddr, VirtualConnection)> { | ||
| self.connections.remove_entry(address) | ||
| } | ||
|
|
||
| /// Check for and return `VirtualConnection`s which have been idling longer than `max_idle_time`. | ||
| pub fn idle_connections(&mut self, max_idle_time: Duration, time: Instant) -> Vec<SocketAddr> { | ||
| self.connections | ||
| .iter() | ||
| .filter(|(_, connection)| connection.last_heard(time) >= max_idle_time) | ||
| .map(|(address, _)| *address) | ||
| .collect() | ||
| } | ||
|
|
||
| /// Get a list of addresses of dead connections | ||
| pub fn dead_connections(&mut self) -> Vec<SocketAddr> { | ||
| self.connections | ||
| .iter() | ||
| .filter(|(_, connection)| connection.should_be_dropped()) | ||
| .map(|(address, _)| *address) | ||
| .collect() | ||
| } | ||
|
|
||
| /// Check for and return `VirtualConnection`s which have not sent anything for a duration of at least `heartbeat_interval`. | ||
| pub fn heartbeat_required_connections( | ||
| &mut self, | ||
| heartbeat_interval: Duration, | ||
| time: Instant, | ||
| ) -> impl Iterator<Item = &mut VirtualConnection> { | ||
| self.connections | ||
| .iter_mut() | ||
| .filter(move |(_, connection)| connection.last_sent(time) >= heartbeat_interval) | ||
| .map(|(_, connection)| connection) | ||
| } | ||
|
|
||
| /// Returns true if the given connection exists. | ||
| pub fn exists(&self, address: &SocketAddr) -> bool { | ||
| self.connections.contains_key(&address) | ||
| } | ||
|
|
||
| /// Returns the number of connected clients. | ||
| #[cfg(test)] | ||
| pub(crate) fn count(&self) -> usize { | ||
| self.connections.len() | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::{ActiveConnections, Config}; | ||
| use std::{ | ||
| sync::Arc, | ||
| time::{Duration, Instant}, | ||
| }; | ||
|
|
||
| const ADDRESS: &str = "127.0.0.1:12345"; | ||
|
|
||
| #[test] | ||
| fn connection_timed_out() { | ||
| let mut connections = ActiveConnections::new(); | ||
| let config = Config::default(); | ||
|
|
||
| let now = Instant::now(); | ||
|
|
||
| // add 10 clients | ||
| for i in 0..10 { | ||
| connections.get_or_insert_connection( | ||
| format!("127.0.0.1:122{}", i).parse().unwrap(), | ||
| &config, | ||
| now, | ||
| ); | ||
| } | ||
|
|
||
| assert_eq!(connections.count(), 10); | ||
|
|
||
| let wait = Duration::from_millis(200); | ||
|
|
||
| #[cfg(not(windows))] | ||
| let epsilon = Duration::from_nanos(1); | ||
| #[cfg(windows)] | ||
| let epsilon = Duration::from_millis(1); | ||
|
|
||
| let timed_out_connections = connections.idle_connections(wait, now + wait - epsilon); | ||
| assert_eq!(timed_out_connections.len(), 0); | ||
|
|
||
| let timed_out_connections = connections.idle_connections(wait, now + wait + epsilon); | ||
| assert_eq!(timed_out_connections.len(), 10); | ||
| } | ||
|
|
||
| #[test] | ||
| fn insert_connection() { | ||
| let mut connections = ActiveConnections::new(); | ||
| let config = Config::default(); | ||
|
|
||
| let address = ADDRESS.parse().unwrap(); | ||
| connections.get_or_insert_connection(address, &config, Instant::now()); | ||
| assert!(connections.connections.contains_key(&address)); | ||
| } | ||
|
|
||
| #[test] | ||
| fn insert_existing_connection() { | ||
| let mut connections = ActiveConnections::new(); | ||
| let config = Config::default(); | ||
|
|
||
| let address = ADDRESS.parse().unwrap(); | ||
| connections.get_or_insert_connection(address, &config, Instant::now()); | ||
| assert!(connections.connections.contains_key(&address)); | ||
| connections.get_or_insert_connection(address, &config, Instant::now()); | ||
| assert!(connections.connections.contains_key(&address)); | ||
| } | ||
|
|
||
| #[test] | ||
| fn remove_connection() { | ||
| let mut connections = ActiveConnections::new(); | ||
| let config = Arc::new(Config::default()); | ||
|
|
||
| let address = ADDRESS.parse().unwrap(); | ||
| connections.get_or_insert_connection(address, &config, Instant::now()); | ||
| assert!(connections.connections.contains_key(&address)); | ||
| connections.remove_connection(&address); | ||
| assert!(!connections.connections.contains_key(&address)); | ||
| } | ||
|
|
||
| #[test] | ||
| fn remove_non_existent_connection() { | ||
| let mut connections = ActiveConnections::new(); | ||
|
|
||
| let address = &ADDRESS.parse().unwrap(); | ||
| connections.remove_connection(address); | ||
| assert!(!connections.connections.contains_key(address)); | ||
| } | ||
| } | ||
| use crate::config::Config; | ||
|
|
||
| use std::{self, fmt::Debug, net::SocketAddr, time::Instant}; | ||
|
|
||
| /// Allows connection to send packet, send event and get global configuration. | ||
| pub trait ConnectionMessenger<ReceiveEvent: Debug> { | ||
| /// Returns global configuration. | ||
| fn config(&self) -> &Config; | ||
|
|
||
| /// Sends a connection event. | ||
| fn send_event(&mut self, address: &SocketAddr, event: ReceiveEvent); | ||
| /// Sends a packet. | ||
| fn send_packet(&mut self, address: &SocketAddr, payload: &[u8]); | ||
| } | ||
|
|
||
| /// Returns an address of an event. | ||
| /// This is used by a `ConnectionManager`, because it doesn't know anything about connection events. | ||
| pub trait ConnectionEventAddress { | ||
| /// Returns event address | ||
| fn address(&self) -> SocketAddr; | ||
| } | ||
|
|
||
| /// Allows to implement actual connection. | ||
| /// Defines a type of `Send` and `Receive` events, that will be used by a connection. | ||
| pub trait Connection: Debug { | ||
| /// Defines a user event type. | ||
| type SendEvent: Debug + ConnectionEventAddress; | ||
| /// Defines a connection event type. | ||
| type ReceiveEvent: Debug + ConnectionEventAddress; | ||
|
|
||
| /// Creates new connection and initialize it by sending an connection event to the user. | ||
| /// * messenger - allows to send packets and events, also provides a config. | ||
| /// * address - defines a address that connection is associated with. | ||
| /// * time - creation time, used by connection, so that it doesn't get dropped immediately or send heartbeat packet. | ||
| /// * initial_data - if initiated by remote host, this will hold that a packet data. | ||
| fn create_connection( | ||
| messenger: &mut impl ConnectionMessenger<Self::ReceiveEvent>, | ||
| address: SocketAddr, | ||
| time: Instant, | ||
| initial_data: Option<&[u8]>, | ||
| ) -> Self; | ||
|
|
||
| /// Determines if the connection should be dropped due to its state. | ||
| fn should_drop( | ||
| &mut self, | ||
| messenger: &mut impl ConnectionMessenger<Self::ReceiveEvent>, | ||
| time: Instant, | ||
| ) -> bool; | ||
|
|
||
| /// Processes a received packet: parse it and emit an event. | ||
| fn process_packet( | ||
| &mut self, | ||
| messenger: &mut impl ConnectionMessenger<Self::ReceiveEvent>, | ||
| payload: &[u8], | ||
| time: Instant, | ||
| ); | ||
|
|
||
| /// Processes a received event and send a packet. | ||
| fn process_event( | ||
| &mut self, | ||
| messenger: &mut impl ConnectionMessenger<Self::ReceiveEvent>, | ||
| event: Self::SendEvent, | ||
| time: Instant, | ||
| ); | ||
|
|
||
| /// Processes various connection-related tasks: resend dropped packets, send heartbeat packet, etc... | ||
| /// This function gets called frequently. | ||
| fn update( | ||
| &mut self, | ||
| messenger: &mut impl ConnectionMessenger<Self::ReceiveEvent>, | ||
| time: Instant, | ||
| ); | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.