From eb1767ad772790002ccaf38c5867de6aabccf295 Mon Sep 17 00:00:00 2001 From: Kristopher Wuollett Date: Fri, 12 May 2023 11:21:45 -0500 Subject: [PATCH 1/4] fix: return bound endpoints after server started The impetus behind the change is to support adding monitoring endpoints with an optional additional TCP listener. While refactoring the server it was discovered that the run method moves the Server self, and so fixed #112 to also ensure that the API endpoint was running before tests could discover the listener's locally bound port. In addition a common cancellation token is added to the server to propagate shutdown to both the core service as well as the API server endpoint. It can be later used to support additional services later such as a monitoring endpoint(s). --- crates/server/src/lib.rs | 122 +++++++++++++++++++++++++++------------ tests/server.rs | 1 - tests/support/mod.rs | 6 +- 3 files changed, 87 insertions(+), 42 deletions(-) diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index c9c7a57d..316bbe5d 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -12,6 +12,8 @@ use std::{ sync::Arc, time::Duration, }; +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; use tower_http::{ trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer}, LatencyUnit, @@ -30,7 +32,7 @@ const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(5); /// The server configuration. pub struct Config { - operator_key: PrivateKey, + operator_key: Option, addr: Option, data_store: Option>, content_dir: Option, @@ -58,7 +60,7 @@ impl Config { /// Creates a new server configuration. pub fn new(operator_key: PrivateKey) -> Self { Self { - operator_key, + operator_key: Some(operator_key), addr: None, data_store: None, content_dir: None, @@ -120,7 +122,15 @@ impl Config { /// Represents the warg registry server. pub struct Server { config: Config, - listener: Option, + endpoints: Option, + token: CancellationToken, + tasks: JoinSet>, +} + +/// The bound endpoints for the warg registry server. +#[derive(Clone)] +pub struct Endpoints { + pub api: SocketAddr, } impl Server { @@ -128,52 +138,51 @@ impl Server { pub fn new(config: Config) -> Self { Self { config, - listener: None, + token: CancellationToken::new(), + endpoints: None, + tasks: JoinSet::new(), } } - /// Binds the server to the configured address. + /// Starts the server on the configured address. /// - /// Returns the address the server bound to. - pub fn bind(&mut self) -> Result { + /// Returns the endpoints the server bound to. + pub async fn start(&mut self) -> Result { + assert!( + self.endpoints.is_none(), + "cannot start server multiple times" + ); + + tracing::debug!( + "using server configuration: {config:?}", + config = self.config + ); + let addr = self .config .addr + .to_owned() .unwrap_or_else(|| DEFAULT_BIND_ADDRESS.parse().unwrap()); - tracing::debug!("binding server to address `{addr}`"); + tracing::debug!("binding api endpoint to address `{addr}`"); let listener = TcpListener::bind(addr) - .with_context(|| format!("failed to bind to address `{addr}`"))?; + .with_context(|| format!("failed to bind api endpoint to address `{addr}`"))?; let addr = listener .local_addr() - .context("failed to get local address for listen socket")?; - - tracing::debug!("server bound to address `{addr}`"); - self.config.addr = Some(addr); - self.listener = Some(listener); - Ok(addr) - } - - /// Runs the server. - pub async fn run(mut self) -> Result<()> { - if self.listener.is_none() { - self.bind()?; - } - - let listener = self.listener.unwrap(); + .context("failed to get local address for api endpoint listen socket")?; + tracing::debug!("api endpoint bound to address `{addr}`"); - tracing::debug!( - "using server configuration: {config:?}", - config = self.config - ); + let endpoints = Endpoints { api: addr }; + self.endpoints = Some(endpoints.to_owned()); let store = self .config .data_store + .take() .unwrap_or_else(|| Box::::default()); let (core, handle) = CoreService::spawn( - self.config.operator_key, + self.config.operator_key.take().unwrap(), store, self.config .checkpoint_interval @@ -183,32 +192,69 @@ impl Server { let server = axum::Server::from_tcp(listener)?.serve( Self::create_router( - format!("http://{addr}", addr = self.config.addr.unwrap()), - self.config.content_dir, + format!("http://{addr}", addr = endpoints.api), + self.config.content_dir.take(), core, )? .into_make_service(), ); - tracing::info!("listening on {addr}", addr = self.config.addr.unwrap()); + tracing::info!("api endpoint listening on {addr}", addr = endpoints.api); - if let Some(shutdown) = self.config.shutdown { - tracing::debug!("server is running with a shutdown signal"); + // Shut down core service when token cancelled. + let token = self.token.clone(); + self.tasks.spawn(async move { + token.cancelled().await; + tracing::info!("waiting for core service to stop"); + handle.stop().await; + Ok(()) + }); + + // Shut down server when token cancelled. + let token: CancellationToken = self.token.clone(); + self.tasks.spawn(async move { + tracing::info!("waiting for api endpoint to stop"); server - .with_graceful_shutdown(async move { shutdown.await }) + .with_graceful_shutdown(async move { token.cancelled().await }) .await?; + Ok(()) + }); + + // Cancel token if shutdown signal received. + if let Some(shutdown) = self.config.shutdown.take() { + tracing::debug!("server is running with a shutdown signal"); + let token = self.token.clone(); + tokio::spawn(async move { + tracing::info!("waiting for shutdown signal"); + shutdown.await; + tracing::info!("shutting down server"); + token.cancel(); + }); } else { tracing::debug!("server is running without a shutdown signal"); - server.await?; } - tracing::info!("waiting for core service to stop"); - handle.stop().await; + Ok(endpoints) + } + + /// Waits on a started server to shutdown. + pub async fn join(&mut self) -> Result<()> { + while (self.tasks.join_next().await).is_some() {} tracing::info!("server shutdown complete"); + Ok(()) + } + /// Starts the server on the configured address and waits for completion. + pub async fn run(&mut self) -> Result<()> { + self.start().await?; + self.join().await?; Ok(()) } + pub fn stop(&mut self) { + self.token.cancel(); + } + fn create_router( base_url: String, content_dir: Option, diff --git a/tests/server.rs b/tests/server.rs index 505034c2..4cf735ad 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -4,7 +4,6 @@ use std::{fs, str::FromStr}; use warg_client::{api, Config, FileSystemClient, StorageLockResult}; use warg_crypto::{signing::PrivateKey, Encode, Signable}; use wit_component::DecodedWasm; - mod support; #[cfg(feature = "postgres")] diff --git a/tests/support/mod.rs b/tests/support/mod.rs index db397407..08192772 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -82,10 +82,10 @@ pub async fn spawn_server( } let mut server = Server::new(config); - let addr = server.bind()?; + let endpoints = server.start().await?; let task = tokio::spawn(async move { - server.run().await.unwrap(); + server.join().await.unwrap(); }); let instance = ServerInstance { @@ -94,7 +94,7 @@ pub async fn spawn_server( }; let config = warg_client::Config { - default_url: Some(format!("http://{addr}")), + default_url: Some(format!("http://{addr}", addr = endpoints.api)), registries_dir: Some(root.join("registries")), content_dir: Some(root.join("content")), }; From 41fd3d73dc389d8c1dae1bedb9b6c606076d855d Mon Sep 17 00:00:00 2001 From: Kristopher Wuollett Date: Fri, 12 May 2023 11:39:46 -0500 Subject: [PATCH 2/4] docs: fix wording of server methods --- crates/server/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index 316bbe5d..5f450427 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -144,7 +144,7 @@ impl Server { } } - /// Starts the server on the configured address. + /// Starts the server and binds its endpoint to the configured address. /// /// Returns the endpoints the server bound to. pub async fn start(&mut self) -> Result { @@ -244,7 +244,7 @@ impl Server { Ok(()) } - /// Starts the server on the configured address and waits for completion. + /// Starts the server and waits for completion. pub async fn run(&mut self) -> Result<()> { self.start().await?; self.join().await?; From a46c7f0e64f8dcc8f3c8d18e2f9ae6653d8274e5 Mon Sep 17 00:00:00 2001 From: Kristopher Wuollett Date: Thu, 18 May 2023 12:10:33 -0500 Subject: [PATCH 3/4] Apply suggestions from code review docs: fix code comments on server Co-authored-by: Peter Huene Signed-off-by: Kristopher Wuollett --- crates/server/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index 5f450427..37f72cc7 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -130,6 +130,7 @@ pub struct Server { /// The bound endpoints for the warg registry server. #[derive(Clone)] pub struct Endpoints { + /// The address of the API endpoint. pub api: SocketAddr, } @@ -144,7 +145,7 @@ impl Server { } } - /// Starts the server and binds its endpoint to the configured address. + /// Starts the server and binds its endpoints to the configured addresses. /// /// Returns the endpoints the server bound to. pub async fn start(&mut self) -> Result { From 4648e468df9ba77b5ac757f5a95ea791eef53d11 Mon Sep 17 00:00:00 2001 From: Kristopher Wuollett Date: Thu, 18 May 2023 12:11:38 -0500 Subject: [PATCH 4/4] fix: clarify copying of endpoints struct Co-authored-by: Peter Huene Signed-off-by: Kristopher Wuollett --- crates/server/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index 37f72cc7..0b987342 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -175,7 +175,7 @@ impl Server { tracing::debug!("api endpoint bound to address `{addr}`"); let endpoints = Endpoints { api: addr }; - self.endpoints = Some(endpoints.to_owned()); + self.endpoints = Some(endpoints.clone()); let store = self .config