Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/either.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#[derive(Debug)]
pub(crate) enum Either<L, R> {
Left(L),
Right(R),
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ mod throughput;
#[cfg(feature = "tester")]
pub use self::throughput::ThroughputMonitoring;

#[cfg(test)]
pub mod test_utils;

pub use self::config::Config;
pub use self::error::{ErrorKind, Result};
pub use self::net::{LinkConditioner, Socket, SocketEvent};
Expand Down
4 changes: 4 additions & 0 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
//! You can think of the socket, connection management, congestion control.

mod connection;
mod connection_impl;
mod connection_manager;
mod events;
mod link_conditioner;
mod quality;
Expand All @@ -10,6 +12,8 @@ mod virtual_connection;

pub mod constants;

pub use self::connection::{Connection, ConnectionEventAddress, ConnectionMessenger};
pub use self::connection_manager::{ConnectionManager, DatagramSocket};
pub use self::events::SocketEvent;
pub use self::link_conditioner::LinkConditioner;
pub use self::quality::{NetworkQuality, RttMeasurer};
Expand Down
260 changes: 73 additions & 187 deletions src/net/connection.rs
Original file line number Diff line number Diff line change
@@ -1,187 +1,73 @@
pub use crate::net::{NetworkQuality, RttMeasurer, VirtualConnection};

use crate::config::Config;
use crate::either::Either::{self, Left, Right};
use std::{
collections::HashMap,
net::SocketAddr,
time::{Duration, Instant},
};

/// Maintains a registry of active "connections". Essentially, when we receive a packet on the
/// socket from a particular `SocketAddr`, we will track information about it here.
#[derive(Debug)]
pub struct ActiveConnections {
connections: HashMap<SocketAddr, VirtualConnection>,
}

impl ActiveConnections {
pub fn new() -> Self {
Self {
connections: HashMap::new(),
}
}

/// Try to get a `VirtualConnection` by address. If the connection does not exist, it will be
/// inserted and returned.
pub fn get_or_insert_connection(
&mut self,
address: SocketAddr,
config: &Config,
time: Instant,
) -> &mut VirtualConnection {
self.connections
.entry(address)
.or_insert_with(|| VirtualConnection::new(address, config, time))
}

/// Try to get or create a [VirtualConnection] by address. If the connection does not exist, it will be
/// created and returned, but not inserted into the table of active connections.
pub(crate) fn get_or_create_connection(
&mut self,
address: SocketAddr,
config: &Config,
time: Instant,
) -> Either<&mut VirtualConnection, VirtualConnection> {
if let Some(connection) = self.connections.get_mut(&address) {
Left(connection)
} else {
Right(VirtualConnection::new(address, config, time))
}
}

/// Removes the connection from `ActiveConnections` by socket address.
pub fn remove_connection(
&mut self,
address: &SocketAddr,
) -> Option<(SocketAddr, VirtualConnection)> {
self.connections.remove_entry(address)
}

/// Check for and return `VirtualConnection`s which have been idling longer than `max_idle_time`.
pub fn idle_connections(&mut self, max_idle_time: Duration, time: Instant) -> Vec<SocketAddr> {
self.connections
.iter()
.filter(|(_, connection)| connection.last_heard(time) >= max_idle_time)
.map(|(address, _)| *address)
.collect()
}

/// Get a list of addresses of dead connections
pub fn dead_connections(&mut self) -> Vec<SocketAddr> {
self.connections
.iter()
.filter(|(_, connection)| connection.should_be_dropped())
.map(|(address, _)| *address)
.collect()
}

/// Check for and return `VirtualConnection`s which have not sent anything for a duration of at least `heartbeat_interval`.
pub fn heartbeat_required_connections(
&mut self,
heartbeat_interval: Duration,
time: Instant,
) -> impl Iterator<Item = &mut VirtualConnection> {
self.connections
.iter_mut()
.filter(move |(_, connection)| connection.last_sent(time) >= heartbeat_interval)
.map(|(_, connection)| connection)
}

/// Returns true if the given connection exists.
pub fn exists(&self, address: &SocketAddr) -> bool {
self.connections.contains_key(&address)
}

/// Returns the number of connected clients.
#[cfg(test)]
pub(crate) fn count(&self) -> usize {
self.connections.len()
}
}

#[cfg(test)]
mod tests {
use super::{ActiveConnections, Config};
use std::{
sync::Arc,
time::{Duration, Instant},
};

const ADDRESS: &str = "127.0.0.1:12345";

#[test]
fn connection_timed_out() {
let mut connections = ActiveConnections::new();
let config = Config::default();

let now = Instant::now();

// add 10 clients
for i in 0..10 {
connections.get_or_insert_connection(
format!("127.0.0.1:122{}", i).parse().unwrap(),
&config,
now,
);
}

assert_eq!(connections.count(), 10);

let wait = Duration::from_millis(200);

#[cfg(not(windows))]
let epsilon = Duration::from_nanos(1);
#[cfg(windows)]
let epsilon = Duration::from_millis(1);

let timed_out_connections = connections.idle_connections(wait, now + wait - epsilon);
assert_eq!(timed_out_connections.len(), 0);

let timed_out_connections = connections.idle_connections(wait, now + wait + epsilon);
assert_eq!(timed_out_connections.len(), 10);
}

#[test]
fn insert_connection() {
let mut connections = ActiveConnections::new();
let config = Config::default();

let address = ADDRESS.parse().unwrap();
connections.get_or_insert_connection(address, &config, Instant::now());
assert!(connections.connections.contains_key(&address));
}

#[test]
fn insert_existing_connection() {
let mut connections = ActiveConnections::new();
let config = Config::default();

let address = ADDRESS.parse().unwrap();
connections.get_or_insert_connection(address, &config, Instant::now());
assert!(connections.connections.contains_key(&address));
connections.get_or_insert_connection(address, &config, Instant::now());
assert!(connections.connections.contains_key(&address));
}

#[test]
fn remove_connection() {
let mut connections = ActiveConnections::new();
let config = Arc::new(Config::default());

let address = ADDRESS.parse().unwrap();
connections.get_or_insert_connection(address, &config, Instant::now());
assert!(connections.connections.contains_key(&address));
connections.remove_connection(&address);
assert!(!connections.connections.contains_key(&address));
}

#[test]
fn remove_non_existent_connection() {
let mut connections = ActiveConnections::new();

let address = &ADDRESS.parse().unwrap();
connections.remove_connection(address);
assert!(!connections.connections.contains_key(address));
}
}
use crate::config::Config;

use std::{self, fmt::Debug, net::SocketAddr, time::Instant};

/// Allows connection to send packet, send event and get global configuration.
pub trait ConnectionMessenger<ReceiveEvent: Debug> {
/// Returns global configuration.
fn config(&self) -> &Config;

/// Sends a connection event.
fn send_event(&mut self, address: &SocketAddr, event: ReceiveEvent);
/// Sends a packet.
fn send_packet(&mut self, address: &SocketAddr, payload: &[u8]);
}

/// Returns an address of an event.
/// This is used by a `ConnectionManager`, because it doesn't know anything about connection events.
pub trait ConnectionEventAddress {
/// Returns event address
fn address(&self) -> SocketAddr;
}

/// Allows to implement actual connection.
/// Defines a type of `Send` and `Receive` events, that will be used by a connection.
pub trait Connection: Debug {
/// Defines a user event type.
type SendEvent: Debug + ConnectionEventAddress;
/// Defines a connection event type.
type ReceiveEvent: Debug + ConnectionEventAddress;

/// Creates new connection and initialize it by sending an connection event to the user.
/// * messenger - allows to send packets and events, also provides a config.
/// * address - defines a address that connection is associated with.
/// * time - creation time, used by connection, so that it doesn't get dropped immediately or send heartbeat packet.
/// * initial_data - if initiated by remote host, this will hold that a packet data.
fn create_connection(
messenger: &mut impl ConnectionMessenger<Self::ReceiveEvent>,
address: SocketAddr,
time: Instant,
initial_data: Option<&[u8]>,
) -> Self;

/// Determines if the connection should be dropped due to its state.
fn should_drop(
&mut self,
messenger: &mut impl ConnectionMessenger<Self::ReceiveEvent>,
time: Instant,
) -> bool;

/// Processes a received packet: parse it and emit an event.
fn process_packet(
&mut self,
messenger: &mut impl ConnectionMessenger<Self::ReceiveEvent>,
payload: &[u8],
time: Instant,
);

/// Processes a received event and send a packet.
fn process_event(
&mut self,
messenger: &mut impl ConnectionMessenger<Self::ReceiveEvent>,
event: Self::SendEvent,
time: Instant,
);

/// Processes various connection-related tasks: resend dropped packets, send heartbeat packet, etc...
/// This function gets called frequently.
fn update(
&mut self,
messenger: &mut impl ConnectionMessenger<Self::ReceiveEvent>,
time: Instant,
);
}
Loading