Skip to content

Commit 4643fd3

Browse files
committed
fix: batch merge of bug fixes (PR #289) - issues #2782, #2783, #2784, #2785, #2786, #2788, #2789, #2791, #2792
Fixes: - #2782: Ctrl+W now correctly deletes word backward in TUI input - #2783: Insert key is now handled (placeholder for future overwrite mode) - #2784: Double-click now selects word at cursor in input field - #2785: Terminal state is properly saved/restored on Ctrl+Z suspend/resume - #2786: Windows console now enables VT processing for ANSI sequences - #2788: Improved async task cancellation with proper HTTP connection cleanup - #2789: Added graceful shutdown for WebSocket/HTTP sessions on server exit - #2791: MCP child processes now have filtered environment (no secrets) - #2792: TUI streaming output optimized with dirty tracking, reduced idle FPS
1 parent d8515b9 commit 4643fd3

File tree

14 files changed

+605
-35
lines changed

14 files changed

+605
-35
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cortex-app-server/src/lib.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,9 @@ where
6464
warn!("Use --auth to enable authentication.");
6565
}
6666

67-
let state = AppState::new(config.clone()).await?;
68-
let app = create_router(state);
67+
let state = Arc::new(AppState::new(config.clone()).await?);
68+
let state_for_cleanup = Arc::clone(&state);
69+
let app = create_router_with_state(state);
6970

7071
let addr: SocketAddr = config.listen_addr.parse()?;
7172
info!("Starting Cortex server on {}", addr);
@@ -102,6 +103,11 @@ where
102103
.with_graceful_shutdown(shutdown)
103104
.await?;
104105

106+
// Graceful shutdown: close all active sessions first
107+
// This ensures WebSocket clients receive proper close frames
108+
info!("Server shutting down, cleaning up active sessions...");
109+
state_for_cleanup.cli_session_manager.shutdown_all().await;
110+
105111
// Cleanup mDNS on shutdown
106112
if let Some(publisher) = mdns_publisher {
107113
if let Err(e) = publisher.unpublish().await {
@@ -117,6 +123,14 @@ where
117123

118124
/// Create the application router.
119125
pub fn create_router(state: AppState) -> Router {
126+
create_router_with_state(Arc::new(state))
127+
}
128+
129+
/// Create the application router with an Arc-wrapped state.
130+
///
131+
/// This variant is useful when you need to keep a reference to the state
132+
/// for cleanup purposes (e.g., during graceful shutdown).
133+
pub fn create_router_with_state(state: Arc<AppState>) -> Router {
120134
let api_routes = api::routes()
121135
.merge(websocket::routes())
122136
.merge(streaming::routes())
@@ -127,5 +141,5 @@ pub fn create_router(state: AppState) -> Router {
127141
.nest("/api/v1", api_routes)
128142
.layer(TraceLayer::new_for_http())
129143
.layer(CorsLayer::permissive())
130-
.with_state(Arc::new(state))
144+
.with_state(state)
131145
}

cortex-app-server/src/session_manager.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,33 @@ impl SessionManager {
411411
}
412412
}
413413

414+
/// Gracefully shutdown all active sessions.
415+
///
416+
/// This method should be called during server shutdown to ensure all
417+
/// WebSocket connections receive proper close frames and all in-progress
418+
/// requests are terminated cleanly.
419+
pub async fn shutdown_all(&self) {
420+
let session_ids: Vec<String> = {
421+
let sessions = self.sessions.read().await;
422+
sessions.keys().cloned().collect()
423+
};
424+
425+
if session_ids.is_empty() {
426+
info!("No active sessions to shutdown");
427+
return;
428+
}
429+
430+
info!("Shutting down {} active sessions", session_ids.len());
431+
432+
for session_id in session_ids {
433+
if let Err(e) = self.destroy_session(&session_id).await {
434+
warn!("Failed to shutdown session {}: {}", session_id, e);
435+
}
436+
}
437+
438+
info!("All sessions shutdown complete");
439+
}
440+
414441
/// List all active sessions.
415442
pub async fn list_sessions(&self) -> Vec<SessionInfo> {
416443
let sessions = self.sessions.read().await;

cortex-app-server/src/streaming.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,47 @@ impl CliSessionManager {
9797
let sessions = self.sessions.read().await;
9898
sessions.len()
9999
}
100+
101+
/// Gracefully shutdown all active CLI sessions.
102+
///
103+
/// This method should be called during server shutdown to ensure all
104+
/// streaming connections receive proper close frames and all in-progress
105+
/// requests are terminated cleanly.
106+
pub async fn shutdown_all(&self) {
107+
let session_ids: Vec<String> = {
108+
let sessions = self.sessions.read().await;
109+
sessions.keys().cloned().collect()
110+
};
111+
112+
if session_ids.is_empty() {
113+
info!("No active CLI sessions to shutdown");
114+
return;
115+
}
116+
117+
info!("Shutting down {} active CLI sessions", session_ids.len());
118+
119+
let mut sessions = self.sessions.write().await;
120+
for session_id in session_ids {
121+
if let Some(session) = sessions.remove(&session_id) {
122+
// Send shutdown command
123+
let _ = session
124+
.handle
125+
.submission_tx
126+
.send(Submission {
127+
id: Uuid::new_v4().to_string(),
128+
op: Op::Shutdown,
129+
})
130+
.await;
131+
132+
// Abort the session task
133+
session.session_task.abort();
134+
135+
debug!(session_id = %session_id, "CLI session shutdown");
136+
}
137+
}
138+
139+
info!("All CLI sessions shutdown complete");
140+
}
100141
}
101142

102143
// ============================================================================

cortex-core/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ doctest = false
1313
[dependencies]
1414
ratatui = { workspace = true }
1515
crossterm = { workspace = true }
16-
tokio = { workspace = true, features = ["full", "sync", "time", "macros", "rt-multi-thread"] }
16+
tokio = { workspace = true, features = ["full", "sync", "time", "macros", "rt-multi-thread", "signal"] }
1717
unicode-width = { workspace = true }
1818
pulldown-cmark = { workspace = true }
1919
image = { workspace = true }
@@ -22,6 +22,9 @@ anyhow = { workspace = true }
2222
futures = { workspace = true }
2323
tracing = { workspace = true }
2424

25+
[target.'cfg(unix)'.dependencies]
26+
libc = "0.2"
27+
2528
# Markdown rendering dependencies
2629
opentui-syntax = { workspace = true }
2730
opentui-text = { workspace = true }

cortex-core/src/frame_engine.rs

Lines changed: 118 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,14 @@ pub enum EngineEvent {
8989
///
9090
/// Contains the pasted text content.
9191
Paste(String),
92+
93+
/// The process was suspended (SIGTSTP / Ctrl+Z on Unix).
94+
/// The application should save terminal state before this.
95+
Suspend,
96+
97+
/// The process was resumed (SIGCONT on Unix).
98+
/// The application should restore terminal state after this.
99+
Resume,
92100
}
93101

94102
/// High-performance frame engine for TUI applications.
@@ -210,6 +218,7 @@ impl FrameEngine {
210218
/// 1. **Tick events**: Generated at the configured tick rate
211219
/// 2. **Terminal events**: Keyboard, mouse, and resize events from crossterm
212220
/// 3. **Shutdown**: Monitors the running flag for graceful termination
221+
/// 4. **Unix signals**: SIGTSTP (Ctrl+Z) and SIGCONT for suspend/resume
213222
///
214223
/// # Errors
215224
///
@@ -230,43 +239,126 @@ impl FrameEngine {
230239
// Create the crossterm event stream
231240
let mut event_stream = EventStream::new();
232241

242+
// Set up Unix signal handlers for suspend/resume (Ctrl+Z)
243+
#[cfg(unix)]
244+
let mut sigtstp =
245+
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::from_raw(libc::SIGTSTP))
246+
.ok();
247+
248+
#[cfg(unix)]
249+
let mut sigcont =
250+
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::from_raw(libc::SIGCONT))
251+
.ok();
252+
233253
// Main event loop using tokio::select!
254+
// On Unix, we also handle SIGTSTP (Ctrl+Z suspend) and SIGCONT (resume) signals
234255
while self.running.load(Ordering::SeqCst) {
235256
let tick_interval = self
236257
.tick_interval
237258
.as_mut()
238259
.expect("tick interval initialized");
239260

240-
tokio::select! {
241-
// Branch 1: Tick interval for animations
242-
_ = tick_interval.tick() => {
243-
self.frame_count = self.frame_count.wrapping_add(1);
244-
self.send_event(EngineEvent::Tick(self.frame_count)).await?;
245-
}
261+
// Platform-specific select to handle signals on Unix
262+
#[cfg(unix)]
263+
{
264+
// Create futures for signal handling (always pending if signal registration failed)
265+
let sigtstp_fut = async {
266+
if let Some(ref mut sig) = sigtstp {
267+
sig.recv().await
268+
} else {
269+
std::future::pending().await
270+
}
271+
};
272+
let sigcont_fut = async {
273+
if let Some(ref mut sig) = sigcont {
274+
sig.recv().await
275+
} else {
276+
std::future::pending().await
277+
}
278+
};
279+
280+
tokio::select! {
281+
// Branch 1: Tick interval for animations
282+
_ = tick_interval.tick() => {
283+
self.frame_count = self.frame_count.wrapping_add(1);
284+
self.send_event(EngineEvent::Tick(self.frame_count)).await?;
285+
}
246286

247-
// Branch 2: Terminal events from crossterm
248-
maybe_event = event_stream.next() => {
249-
match maybe_event {
250-
Some(Ok(event)) => {
251-
if let Some(engine_event) = self.translate_event(event) {
252-
// Check for quit before sending
253-
let is_quit = matches!(engine_event, EngineEvent::Quit);
254-
self.send_event(engine_event).await?;
255-
256-
if is_quit {
257-
self.running.store(false, Ordering::SeqCst);
287+
// Branch 2: Terminal events from crossterm
288+
maybe_event = event_stream.next() => {
289+
match maybe_event {
290+
Some(Ok(event)) => {
291+
if let Some(engine_event) = self.translate_event(event) {
292+
// Check for quit before sending
293+
let is_quit = matches!(engine_event, EngineEvent::Quit);
294+
self.send_event(engine_event).await?;
295+
296+
if is_quit {
297+
self.running.store(false, Ordering::SeqCst);
298+
}
258299
}
259300
}
301+
Some(Err(e)) => {
302+
// Send error event but continue running
303+
let error_msg = format!("Event stream error: {e}");
304+
self.send_event(EngineEvent::Error(error_msg)).await?;
305+
}
306+
None => {
307+
// Event stream ended - this typically means terminal closed
308+
self.send_event(EngineEvent::Quit).await?;
309+
self.running.store(false, Ordering::SeqCst);
310+
}
260311
}
261-
Some(Err(e)) => {
262-
// Send error event but continue running
263-
let error_msg = format!("Event stream error: {e}");
264-
self.send_event(EngineEvent::Error(error_msg)).await?;
265-
}
266-
None => {
267-
// Event stream ended - this typically means terminal closed
268-
self.send_event(EngineEvent::Quit).await?;
269-
self.running.store(false, Ordering::SeqCst);
312+
}
313+
314+
// Branch 3: SIGTSTP (Ctrl+Z suspend)
315+
_ = sigtstp_fut => {
316+
// Send suspend event so the application can save terminal state
317+
self.send_event(EngineEvent::Suspend).await?;
318+
}
319+
320+
// Branch 4: SIGCONT (resume after suspend)
321+
_ = sigcont_fut => {
322+
// Send resume event so the application can restore terminal state
323+
self.send_event(EngineEvent::Resume).await?;
324+
}
325+
}
326+
}
327+
328+
// Non-Unix platforms: no signal handling
329+
#[cfg(not(unix))]
330+
{
331+
tokio::select! {
332+
// Branch 1: Tick interval for animations
333+
_ = tick_interval.tick() => {
334+
self.frame_count = self.frame_count.wrapping_add(1);
335+
self.send_event(EngineEvent::Tick(self.frame_count)).await?;
336+
}
337+
338+
// Branch 2: Terminal events from crossterm
339+
maybe_event = event_stream.next() => {
340+
match maybe_event {
341+
Some(Ok(event)) => {
342+
if let Some(engine_event) = self.translate_event(event) {
343+
// Check for quit before sending
344+
let is_quit = matches!(engine_event, EngineEvent::Quit);
345+
self.send_event(engine_event).await?;
346+
347+
if is_quit {
348+
self.running.store(false, Ordering::SeqCst);
349+
}
350+
}
351+
}
352+
Some(Err(e)) => {
353+
// Send error event but continue running
354+
let error_msg = format!("Event stream error: {e}");
355+
self.send_event(EngineEvent::Error(error_msg)).await?;
356+
}
357+
None => {
358+
// Event stream ended - this typically means terminal closed
359+
self.send_event(EngineEvent::Quit).await?;
360+
self.running.store(false, Ordering::SeqCst);
361+
}
270362
}
271363
}
272364
}

0 commit comments

Comments
 (0)