diff --git a/src/http/mod.rs b/src/http/mod.rs index 119c467..7b71f7e 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -40,6 +40,10 @@ use std::rc::Rc; pub use http::Method; use smallvec::SmallVec; +/// Default capacity of the buffer when reading chunks of bytes from the stream +/// On OSX if chunk_size > bytes_available_on_stream, the read operation will block +pub const DEFAULT_CHUNK_SIZE: usize = 1024; + type HttpTlsConnection = Connection>>; /// Re-usable container to store headers @@ -96,14 +100,14 @@ impl<'a> Headers<'a> { } /// A generic HTTP client that uses a pooled connection strategy. -pub struct HttpClient { +pub struct HttpClient, const CHUNK_SIZE: usize = DEFAULT_CHUNK_SIZE> { connection_pool: Rc>, headers: Headers<'static>, } -impl HttpClient { +impl, const CHUNK_SIZE: usize> HttpClient { /// Create a new HTTP client from the provided pool. - pub fn new(connection_pool: C) -> HttpClient { + pub fn new(connection_pool: C) -> HttpClient { Self { connection_pool: Rc::new(RefCell::new(connection_pool)), headers: Headers { @@ -138,7 +142,7 @@ impl HttpClient { path: impl AsRef, body: Option<&[u8]>, builder: F, - ) -> io::Result> + ) -> io::Result> where F: FnOnce(&mut Headers), { @@ -173,18 +177,21 @@ impl HttpClient { method: Method, path: impl AsRef, body: Option<&[u8]>, - ) -> io::Result> { + ) -> io::Result> { self.new_request_with_headers(method, path, body, |_| {}) } } /// Trait defining a pool of reusable connections. -pub trait ConnectionPool: Sized { +pub trait ConnectionPool: Sized { /// Underlying stream type. type Stream: Read + Write; /// Turn this connection pool into http client. - fn into_http_client(self) -> HttpClient { + fn into_http_client(self) -> HttpClient + where + Self: ConnectionPool, + { HttpClient::new(self) } @@ -192,10 +199,10 @@ pub trait ConnectionPool: Sized { fn host(&self) -> &str; /// Acquire next free connection, if available. - fn acquire(&mut self) -> io::Result>>; + fn acquire(&mut self) -> io::Result>>; /// Release a connection back into the pool. - fn release(&mut self, stream: Option>); + fn release(&mut self, stream: Option>); } /// A single-connection pool over TLS, reconnecting on demand. @@ -258,8 +265,8 @@ impl ConnectionPool for SingleTlsConnectionPool { } /// Represents an in-flight HTTP exchange. -pub struct HttpRequest { - conn: Option>, +pub struct HttpRequest, const CHUNK_SIZE: usize = DEFAULT_CHUNK_SIZE> { + conn: Option>, pool: Rc>, state: State, } @@ -278,15 +285,15 @@ enum State { }, } -impl HttpRequest { +impl, const CHUNK_SIZE: usize> HttpRequest { fn new( method: Method, path: impl AsRef, body: Option<&[u8]>, headers: &Headers, - mut conn: Connection, + mut conn: Connection, pool: Rc>, - ) -> io::Result> { + ) -> io::Result> { conn.write_all(method.as_str().as_bytes())?; conn.write_all(b" ")?; conn.write_all(path.as_ref().as_bytes())?; @@ -440,7 +447,7 @@ impl HttpRequest { } } -impl Drop for HttpRequest { +impl, const CHUNK_SIZE: usize> Drop for HttpRequest { fn drop(&mut self) { if let Some(conn) = self.conn.as_mut() { conn.buffer.clear(); @@ -451,7 +458,7 @@ impl Drop for HttpRequest { /// Connection managed by the `ConnectionPool`. Binds underlying stream together with buffer used /// for reading data. The reading is performed in chunks with default size of 1024 bytes. -pub struct Connection { +pub struct Connection { stream: S, buffer: Vec, disconnected: bool, @@ -480,7 +487,7 @@ impl Connection { } } -impl Write for Connection { +impl Write for Connection { #[inline] fn write(&mut self, buf: &[u8]) -> io::Result { self.stream.write(buf) @@ -493,8 +500,26 @@ impl Write for Connection { } impl Connection { + /// Creates a new connection wrapper around the provided stream. + /// + /// Initializes a read buffer with capacity equal to `CHUNK_SIZE` and sets up + /// the HTTP header boundary finder for parsing responses. + /// + /// # Arguments + /// + /// * `stream` - The underlying I/O stream to wrap + /// + /// # Examples + /// + /// ```no_run + /// use boomnet::http::Connection; + /// use boomnet::stream::tcp::TcpStream; + /// + /// let tcp = TcpStream::try_from(("127.0.0.1", 4222)).unwrap(); + /// let connection = Connection::<_, 1024>::new(tcp); + /// ``` #[inline] - fn new(stream: S) -> Self { + pub fn new(stream: S) -> Self { Self { stream, buffer: Vec::with_capacity(CHUNK_SIZE), @@ -502,6 +527,12 @@ impl Connection { header_finder: Finder::new(b"\r\n\r\n"), } } + + /// Returns whether the connection has been marked as disconnected. + #[inline] + pub const fn is_disconnected(&self) -> bool { + self.disconnected + } } #[cfg(test)]