-
Notifications
You must be signed in to change notification settings - Fork 34
fix: return bound endpoints after server started #113
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,8 @@ use std::{ | |
| sync::Arc, | ||
| time::Duration, | ||
| }; | ||
| use tokio::task::JoinSet; | ||
| use tokio_util::sync::CancellationToken; | ||
| use tower_http::{ | ||
| trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer}, | ||
| LatencyUnit, | ||
|
|
@@ -30,7 +32,7 @@ const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(5); | |
|
|
||
| /// The server configuration. | ||
| pub struct Config { | ||
| operator_key: PrivateKey, | ||
| operator_key: Option<PrivateKey>, | ||
| addr: Option<SocketAddr>, | ||
| data_store: Option<Box<dyn datastore::DataStore>>, | ||
| content_dir: Option<PathBuf>, | ||
|
|
@@ -58,7 +60,7 @@ impl Config { | |
| /// Creates a new server configuration. | ||
| pub fn new(operator_key: PrivateKey) -> Self { | ||
| Self { | ||
| operator_key, | ||
| operator_key: Some(operator_key), | ||
| addr: None, | ||
| data_store: None, | ||
| content_dir: None, | ||
|
|
@@ -120,60 +122,68 @@ impl Config { | |
| /// Represents the warg registry server. | ||
| pub struct Server { | ||
| config: Config, | ||
| listener: Option<TcpListener>, | ||
| endpoints: Option<Endpoints>, | ||
| token: CancellationToken, | ||
| tasks: JoinSet<Result<()>>, | ||
| } | ||
|
|
||
| /// The bound endpoints for the warg registry server. | ||
| #[derive(Clone)] | ||
| pub struct Endpoints { | ||
| /// The address of the API endpoint. | ||
| pub api: SocketAddr, | ||
| } | ||
|
|
||
| impl Server { | ||
| /// Creates a new server with the given configuration. | ||
| pub fn new(config: Config) -> Self { | ||
| Self { | ||
| config, | ||
| listener: None, | ||
| token: CancellationToken::new(), | ||
| endpoints: None, | ||
| tasks: JoinSet::new(), | ||
| } | ||
| } | ||
|
|
||
| /// Binds the server to the configured address. | ||
| /// Starts the server and binds its endpoints to the configured addresses. | ||
| /// | ||
| /// Returns the address the server bound to. | ||
| pub fn bind(&mut self) -> Result<SocketAddr> { | ||
| /// Returns the endpoints the server bound to. | ||
| pub async fn start(&mut self) -> Result<Endpoints> { | ||
| assert!( | ||
| self.endpoints.is_none(), | ||
| "cannot start server multiple times" | ||
| ); | ||
|
Comment on lines
+152
to
+155
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I'm personally not a big fan of taking What if we renamed this
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree, I was just about to ask why required fields were becoming optional. I'd much rather have spawning produce a dedicated type that represents the info for a started service.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was just trying to minimize the patch since existing code was Things became See other comment about providing a
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The existing code for That said, I think perhaps we could better model our That is, you contruct/build a server, it internally binds based on its configuration; the binding can be inspected via a method; in the case of hype there is only a single bound address for the server, but our implementation might have multiple bound addresses as there might be multiple hyper servers internally (e.g. API, monitoring, etc). Finally, it implements
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool, yes, I'd follow some patterns used on some existing servers already like the |
||
|
|
||
| tracing::debug!( | ||
| "using server configuration: {config:?}", | ||
| config = self.config | ||
| ); | ||
|
|
||
| let addr = self | ||
| .config | ||
| .addr | ||
| .to_owned() | ||
| .unwrap_or_else(|| DEFAULT_BIND_ADDRESS.parse().unwrap()); | ||
|
|
||
| tracing::debug!("binding server to address `{addr}`"); | ||
| tracing::debug!("binding api endpoint to address `{addr}`"); | ||
| let listener = TcpListener::bind(addr) | ||
| .with_context(|| format!("failed to bind to address `{addr}`"))?; | ||
| .with_context(|| format!("failed to bind api endpoint to address `{addr}`"))?; | ||
|
|
||
| let addr = listener | ||
| .local_addr() | ||
| .context("failed to get local address for listen socket")?; | ||
|
|
||
| tracing::debug!("server bound to address `{addr}`"); | ||
| self.config.addr = Some(addr); | ||
| self.listener = Some(listener); | ||
| Ok(addr) | ||
| } | ||
|
|
||
| /// Runs the server. | ||
| pub async fn run(mut self) -> Result<()> { | ||
| if self.listener.is_none() { | ||
| self.bind()?; | ||
| } | ||
|
|
||
| let listener = self.listener.unwrap(); | ||
| .context("failed to get local address for api endpoint listen socket")?; | ||
| tracing::debug!("api endpoint bound to address `{addr}`"); | ||
|
|
||
| tracing::debug!( | ||
| "using server configuration: {config:?}", | ||
| config = self.config | ||
| ); | ||
| let endpoints = Endpoints { api: addr }; | ||
| self.endpoints = Some(endpoints.clone()); | ||
|
|
||
| let store = self | ||
| .config | ||
| .data_store | ||
| .take() | ||
| .unwrap_or_else(|| Box::<MemoryDataStore>::default()); | ||
| let (core, handle) = CoreService::spawn( | ||
| self.config.operator_key, | ||
| self.config.operator_key.take().unwrap(), | ||
| store, | ||
| self.config | ||
| .checkpoint_interval | ||
|
|
@@ -183,32 +193,69 @@ impl Server { | |
|
|
||
| let server = axum::Server::from_tcp(listener)?.serve( | ||
| Self::create_router( | ||
| format!("http://{addr}", addr = self.config.addr.unwrap()), | ||
| self.config.content_dir, | ||
| format!("http://{addr}", addr = endpoints.api), | ||
| self.config.content_dir.take(), | ||
| core, | ||
| )? | ||
| .into_make_service(), | ||
| ); | ||
|
|
||
| tracing::info!("listening on {addr}", addr = self.config.addr.unwrap()); | ||
| tracing::info!("api endpoint listening on {addr}", addr = endpoints.api); | ||
|
|
||
| if let Some(shutdown) = self.config.shutdown { | ||
| tracing::debug!("server is running with a shutdown signal"); | ||
| // Shut down core service when token cancelled. | ||
| let token = self.token.clone(); | ||
| self.tasks.spawn(async move { | ||
| token.cancelled().await; | ||
| tracing::info!("waiting for core service to stop"); | ||
| handle.stop().await; | ||
| Ok(()) | ||
| }); | ||
|
Comment on lines
+207
to
+212
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this task be merged with the next task? That is to say, after the Given this design, I would expect a 1:1 between a task in the task set and a running Hyper server (API in this case, but soon a monitoring server, etc). Each would shutdown in their own way.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd think due to the nature of fanning out a cancelation signal chain, a cancellation token should be cloned for each owned depedency, somewhat similar to how Go Context works. Otherwise there virtually will be for-loop boilerplate being written to cancel "children" all separately as a server grows. That allows things to shut down in parallel easily if needed. But yes, could keep things simple for now and merge the shutdown tasks instead of renaming chaining of them through the tokens themselves. However I'd either want to use a broadcast channel instead, or an additional token for draining in #111.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we want the core service to be stopped in parallel to the hyper server shutting down as it might race. If we call If this is done in parallel to the hyper server shutting down, then it's possible a request being drained attempts to enqueue work for the core service to do; if it so happens that the core service already stopped and the send limit for the message queue is reached, it'll block waiting for the core service to remove a message which will never occur since its task has ended. We also don't want to close the message queue's sender as part of stopping the core service as this would result in a panic (even a graceful error back to the client here is not ideal) and a reset of the connection for any request attempting to enqueue work for the core service to do after it has stopped. I believe the correct order of operations here is to shutdown the hyper server then join on the core service, like we had before.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that eventually the core service won't be part
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was merely suggesting making parallel shutdown possible, but did not point out a use case yet where to apply it now. I'm well aware about needing to keep dependencies going, like a core service, while a user of it drains, like the API server. We share the same view of the shutdown order, but what is your thoughts on using a |
||
|
|
||
| // Shut down server when token cancelled. | ||
| let token: CancellationToken = self.token.clone(); | ||
| self.tasks.spawn(async move { | ||
| tracing::info!("waiting for api endpoint to stop"); | ||
| server | ||
| .with_graceful_shutdown(async move { shutdown.await }) | ||
| .with_graceful_shutdown(async move { token.cancelled().await }) | ||
| .await?; | ||
| Ok(()) | ||
| }); | ||
|
|
||
| // Cancel token if shutdown signal received. | ||
| if let Some(shutdown) = self.config.shutdown.take() { | ||
| tracing::debug!("server is running with a shutdown signal"); | ||
| let token = self.token.clone(); | ||
| tokio::spawn(async move { | ||
| tracing::info!("waiting for shutdown signal"); | ||
| shutdown.await; | ||
| tracing::info!("shutting down server"); | ||
| token.cancel(); | ||
| }); | ||
| } else { | ||
| tracing::debug!("server is running without a shutdown signal"); | ||
| server.await?; | ||
| } | ||
|
|
||
| tracing::info!("waiting for core service to stop"); | ||
| handle.stop().await; | ||
| Ok(endpoints) | ||
| } | ||
|
|
||
| /// Waits on a started server to shutdown. | ||
| pub async fn join(&mut self) -> Result<()> { | ||
| while (self.tasks.join_next().await).is_some() {} | ||
| tracing::info!("server shutdown complete"); | ||
| Ok(()) | ||
| } | ||
|
|
||
| /// Starts the server and waits for completion. | ||
| pub async fn run(&mut self) -> Result<()> { | ||
| self.start().await?; | ||
| self.join().await?; | ||
| Ok(()) | ||
| } | ||
|
|
||
| pub fn stop(&mut self) { | ||
| self.token.cancel(); | ||
| } | ||
|
|
||
| fn create_router( | ||
| base_url: String, | ||
| content_dir: Option<PathBuf>, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.