Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 49 additions & 18 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ use std::rc::Rc;
pub use http::Method;
use smallvec::SmallVec;

/// Default capacity of the buffer when reading chunks of bytes from the stream
/// On OSX if chunk_size > bytes_available_on_stream, the read operation will block
pub const DEFAULT_CHUNK_SIZE: usize = 1024;

type HttpTlsConnection = Connection<BufferedStream<TlsStream<TcpStream>>>;

/// Re-usable container to store headers
Expand Down Expand Up @@ -96,14 +100,14 @@ impl<'a> Headers<'a> {
}

/// A generic HTTP client that uses a pooled connection strategy.
pub struct HttpClient<C: ConnectionPool> {
pub struct HttpClient<C: ConnectionPool<CHUNK_SIZE>, const CHUNK_SIZE: usize = DEFAULT_CHUNK_SIZE> {
connection_pool: Rc<RefCell<C>>,
headers: Headers<'static>,
}

impl<C: ConnectionPool> HttpClient<C> {
impl<C: ConnectionPool<CHUNK_SIZE>, const CHUNK_SIZE: usize> HttpClient<C, CHUNK_SIZE> {
/// Create a new HTTP client from the provided pool.
pub fn new(connection_pool: C) -> HttpClient<C> {
pub fn new(connection_pool: C) -> HttpClient<C, CHUNK_SIZE> {
Self {
connection_pool: Rc::new(RefCell::new(connection_pool)),
headers: Headers {
Expand Down Expand Up @@ -138,7 +142,7 @@ impl<C: ConnectionPool> HttpClient<C> {
path: impl AsRef<str>,
body: Option<&[u8]>,
builder: F,
) -> io::Result<HttpRequest<C>>
) -> io::Result<HttpRequest<C, CHUNK_SIZE>>
where
F: FnOnce(&mut Headers),
{
Expand Down Expand Up @@ -173,29 +177,32 @@ impl<C: ConnectionPool> HttpClient<C> {
method: Method,
path: impl AsRef<str>,
body: Option<&[u8]>,
) -> io::Result<HttpRequest<C>> {
) -> io::Result<HttpRequest<C, CHUNK_SIZE>> {
self.new_request_with_headers(method, path, body, |_| {})
}
}

/// Trait defining a pool of reusable connections.
pub trait ConnectionPool: Sized {
pub trait ConnectionPool<const CHUNK_SIZE: usize = DEFAULT_CHUNK_SIZE>: Sized {
/// Underlying stream type.
type Stream: Read + Write;

/// Turn this connection pool into http client.
fn into_http_client(self) -> HttpClient<Self> {
fn into_http_client(self) -> HttpClient<Self, CHUNK_SIZE>
where
Self: ConnectionPool<CHUNK_SIZE>,
{
HttpClient::new(self)
}

/// Hostname for requests.
fn host(&self) -> &str;

/// Acquire next free connection, if available.
fn acquire(&mut self) -> io::Result<Option<Connection<Self::Stream>>>;
fn acquire(&mut self) -> io::Result<Option<Connection<Self::Stream, CHUNK_SIZE>>>;

/// Release a connection back into the pool.
fn release(&mut self, stream: Option<Connection<Self::Stream>>);
fn release(&mut self, stream: Option<Connection<Self::Stream, CHUNK_SIZE>>);
}

/// A single-connection pool over TLS, reconnecting on demand.
Expand Down Expand Up @@ -258,8 +265,8 @@ impl ConnectionPool for SingleTlsConnectionPool {
}

/// Represents an in-flight HTTP exchange.
pub struct HttpRequest<C: ConnectionPool> {
conn: Option<Connection<C::Stream>>,
pub struct HttpRequest<C: ConnectionPool<CHUNK_SIZE>, const CHUNK_SIZE: usize = DEFAULT_CHUNK_SIZE> {
conn: Option<Connection<C::Stream, CHUNK_SIZE>>,
pool: Rc<RefCell<C>>,
state: State,
}
Expand All @@ -278,15 +285,15 @@ enum State {
},
}

impl<C: ConnectionPool> HttpRequest<C> {
impl<C: ConnectionPool<CHUNK_SIZE>, const CHUNK_SIZE: usize> HttpRequest<C, CHUNK_SIZE> {
fn new(
method: Method,
path: impl AsRef<str>,
body: Option<&[u8]>,
headers: &Headers,
mut conn: Connection<C::Stream>,
mut conn: Connection<C::Stream, CHUNK_SIZE>,
pool: Rc<RefCell<C>>,
) -> io::Result<HttpRequest<C>> {
) -> io::Result<HttpRequest<C, CHUNK_SIZE>> {
conn.write_all(method.as_str().as_bytes())?;
conn.write_all(b" ")?;
conn.write_all(path.as_ref().as_bytes())?;
Expand Down Expand Up @@ -440,7 +447,7 @@ impl<C: ConnectionPool> HttpRequest<C> {
}
}

impl<C: ConnectionPool> Drop for HttpRequest<C> {
impl<C: ConnectionPool<CHUNK_SIZE>, const CHUNK_SIZE: usize> Drop for HttpRequest<C, CHUNK_SIZE> {
fn drop(&mut self) {
if let Some(conn) = self.conn.as_mut() {
conn.buffer.clear();
Expand All @@ -451,7 +458,7 @@ impl<C: ConnectionPool> Drop for HttpRequest<C> {

/// Connection managed by the `ConnectionPool`. Binds underlying stream together with buffer used
/// for reading data. The reading is performed in chunks with default size of 1024 bytes.
pub struct Connection<S, const CHUNK_SIZE: usize = 1024> {
pub struct Connection<S, const CHUNK_SIZE: usize = DEFAULT_CHUNK_SIZE> {
stream: S,
buffer: Vec<u8>,
disconnected: bool,
Expand Down Expand Up @@ -480,7 +487,7 @@ impl<S: Read + Write, const CHUNK_SIZE: usize> Connection<S, CHUNK_SIZE> {
}
}

impl<S: Write> Write for Connection<S> {
impl<S: Write, const CHUNK_SIZE: usize> Write for Connection<S, CHUNK_SIZE> {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.stream.write(buf)
Expand All @@ -493,15 +500,39 @@ impl<S: Write> Write for Connection<S> {
}

impl<S, const CHUNK_SIZE: usize> Connection<S, CHUNK_SIZE> {
/// Creates a new connection wrapper around the provided stream.
///
/// Initializes a read buffer with capacity equal to `CHUNK_SIZE` and sets up
/// the HTTP header boundary finder for parsing responses.
///
/// # Arguments
///
/// * `stream` - The underlying I/O stream to wrap
///
/// # Examples
///
/// ```no_run
/// use boomnet::http::Connection;
/// use boomnet::stream::tcp::TcpStream;
///
/// let tcp = TcpStream::try_from(("127.0.0.1", 4222)).unwrap();
/// let connection = Connection::<_, 1024>::new(tcp);
/// ```
#[inline]
fn new(stream: S) -> Self {
pub fn new(stream: S) -> Self {
Self {
stream,
buffer: Vec::with_capacity(CHUNK_SIZE),
disconnected: false,
header_finder: Finder::new(b"\r\n\r\n"),
}
}

/// Returns whether the connection has been marked as disconnected.
#[inline]
pub const fn is_disconnected(&self) -> bool {
self.disconnected
}
}

#[cfg(test)]
Expand Down
Loading