From 043ee8e1b21a255fadcf1b19705575a69b27e50f Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Tue, 18 Nov 2025 16:45:55 +0100 Subject: [PATCH 1/3] feat: add ability to restart single service --- bec_lib/bec_lib/messages.py | 2 + .../bec_server/bec_server_utils/launch.py | 35 ++- .../bec_server_utils/service_handler.py | 138 ++++++++++- .../bec_server_utils/tmux_launch.py | 226 +++++++++++++++--- .../scihub/service_handler/service_handler.py | 47 +++- ...handler.py => test_cli_service_handler.py} | 63 +++++ .../tests/tests_bec_server_utils/test_main.py | 27 ++- .../test_tmux_launch.py | 175 +++++++++++++- .../tests_scihub/test_service_handler.py | 109 +++++++++ 9 files changed, 774 insertions(+), 48 deletions(-) rename bec_server/tests/tests_bec_server_utils/{test_service_handler.py => test_cli_service_handler.py} (52%) create mode 100644 bec_server/tests/tests_scihub/test_service_handler.py diff --git a/bec_lib/bec_lib/messages.py b/bec_lib/bec_lib/messages.py index fd1be62b0..ad4c73e7d 100644 --- a/bec_lib/bec_lib/messages.py +++ b/bec_lib/bec_lib/messages.py @@ -1178,11 +1178,13 @@ class ServiceRequestMessage(BECMessage): Args: action (Literal["restart"]): Action to be executed by the service + service_name (str | None): Name of the service to be restarted. If None, all services will be restarted. metadata (dict, optional): Metadata. Defaults to None. """ msg_type: ClassVar[str] = "service_request_message" action: Literal["restart"] + service_name: str | None = None class ProcedureRequestMessage(BECMessage): diff --git a/bec_server/bec_server/bec_server_utils/launch.py b/bec_server/bec_server/bec_server_utils/launch.py index 5c4117461..acfff86fb 100644 --- a/bec_server/bec_server/bec_server_utils/launch.py +++ b/bec_server/bec_server/bec_server_utils/launch.py @@ -29,7 +29,19 @@ def main(): default=None, help="Interface to use (tmux, iterm2, systemctl, subprocess)", ) - command.add_parser("stop", help="Stop the BEC server") + start.add_argument( + "--service", + type=str, + default=None, + help="Start a specific service only (e.g., scan_server, device_server)", + ) + stop = command.add_parser("stop", help="Stop the BEC server") + stop.add_argument( + "--service", + type=str, + default=None, + help="Stop a specific service only (e.g., scan_server, device_server)", + ) restart = command.add_parser("restart", help="Restart the BEC server") restart.add_argument( "--config", type=str, default=None, help="Path to the BEC service config file" @@ -40,6 +52,12 @@ def main(): default=None, help="Interface to use (tmux, iterm2, systemctl, subprocess)", ) + restart.add_argument( + "--service", + type=str, + default=None, + help="Restart a specific service only (e.g., scan_server, device_server)", + ) command.add_parser("attach", help="Open the currently running BEC server session") args = parser.parse_args() @@ -59,11 +77,20 @@ def main(): no_persistence=args.no_persistence if "no_persistence" in args else False, ) if args.command == "start": - service_handler.start() + if hasattr(args, "service") and args.service: + service_handler.start_service(args.service) + else: + service_handler.start() elif args.command == "stop": - service_handler.stop() + if hasattr(args, "service") and args.service: + service_handler.stop_service(args.service) + else: + service_handler.stop() elif args.command == "restart": - service_handler.restart() + if hasattr(args, "service") and args.service: + service_handler.restart_service(args.service) + else: + service_handler.restart() elif args.command == "attach": if os.path.exists("/tmp/tmux-shared/default"): # if we have a shared socket, use it diff --git a/bec_server/bec_server/bec_server_utils/service_handler.py b/bec_server/bec_server/bec_server_utils/service_handler.py index 2a7fe4177..121caf563 100644 --- a/bec_server/bec_server/bec_server_utils/service_handler.py +++ b/bec_server/bec_server/bec_server_utils/service_handler.py @@ -5,6 +5,7 @@ import sys import time from dataclasses import dataclass, field +from enum import Enum from string import Template from typing import Callable, Literal, Union @@ -12,7 +13,21 @@ from bec_lib.service_config import ServiceConfig from bec_server.bec_server_utils.subprocess_launch import subprocess_start, subprocess_stop -from bec_server.bec_server_utils.tmux_launch import tmux_start, tmux_stop +from bec_server.bec_server_utils.tmux_launch import ( + tmux_restart_service, + tmux_start, + tmux_start_service, + tmux_stop, + tmux_stop_service, +) + + +class ServiceOperation(str, Enum): + """Enum for service operations.""" + + STARTING = "Starting" + STOPPING = "Stopping" + RESTARTING = "Restarting" class bcolors: @@ -212,3 +227,124 @@ def restart(self): print("Restarting BEC server...") self.stop() self.start() + + def _validate_service(self, service_name: str) -> tuple[bool, ServiceDesc | None]: + """ + Validate that a service name exists in the available services. + + Args: + service_name (str): Name of the service to validate + + Returns: + tuple: (is_valid: bool, service_config: ServiceDesc or None) + """ + if service_name not in self.SERVICES: + print( + f"{bcolors.FAIL}Error: Unknown service '{service_name}'. Available services: {', '.join(self.SERVICES.keys())}{bcolors.ENDC}" + ) + return False, None + + service_config = copy.deepcopy(self.SERVICES[service_name]) + if self.config_path: + service_config.command += f" --config {self.config_path}" + + return True, service_config + + def _handle_service_operation( + self, + service_name: str, + operation: ServiceOperation, + tmux_func: Callable, + require_config: bool = True, + ) -> bool: + """ + Handle a service operation (start, stop, restart) with common validation and execution logic. + + Args: + service_name (str): Name of the service + operation (ServiceOperation): Operation type (STARTING, STOPPING, RESTARTING) + tmux_func (Callable): Function to call for tmux interface + require_config (bool): Whether the operation requires service config + + Returns: + bool: True if operation was successful, False otherwise + """ + is_valid, service_config = self._validate_service(service_name) + if not is_valid: + return False + + if self.interface == "tmux": + print(f"{operation.value} {service_name}...") + if require_config: + success = tmux_func(self.bec_path, service_name, service_config) + else: + success = tmux_func(service_name) + + if success: + print( + f"{bcolors.OKGREEN}Successfully {operation.value.lower()} {service_name}{bcolors.ENDC}" + ) + else: + print( + f"{bcolors.FAIL}Failed to {operation.value.lower()} {service_name}. Service not found in tmux session.{bcolors.ENDC}" + ) + return success + elif self.interface == "systemctl" and operation == ServiceOperation.RESTARTING: + print( + f"{bcolors.WARNING}Warning: {operation.value} individual services is not supported with systemctl. Please {operation.value.lower()} the entire BEC server.{bcolors.ENDC}" + ) + return False + elif ( + self.interface in ("iterm2", "subprocess") and operation == ServiceOperation.RESTARTING + ): + print( + f"{bcolors.WARNING}Warning: {operation.value} individual services is not supported with {self.interface}. Please {operation.value.lower()} the entire BEC server.{bcolors.ENDC}" + ) + return False + else: + print( + f"{bcolors.WARNING}Warning: {operation.value} individual services is only supported with tmux interface.{bcolors.ENDC}" + ) + return False + + def restart_service(self, service_name: str) -> bool: + """ + Restart a single service using the available interface. + + Args: + service_name (str): Name of the service to restart (e.g., "scan_server", "device_server") + + Returns: + bool: True if service was restarted successfully, False otherwise + """ + return self._handle_service_operation( + service_name, ServiceOperation.RESTARTING, tmux_restart_service, require_config=True + ) + + def start_service(self, service_name: str) -> bool: + """ + Start a single service using the available interface. + + Args: + service_name (str): Name of the service to start (e.g., "scan_server", "device_server") + + Returns: + bool: True if service was started successfully, False otherwise + """ + return self._handle_service_operation( + service_name, ServiceOperation.STARTING, tmux_start_service, require_config=True + ) + + def stop_service(self, service_name: str) -> bool: + """ + Stop a single service using the available interface. + + Args: + service_name (str): Name of the service to stop (e.g., "scan_server", "device_server") + + Returns: + bool: True if service was stopped successfully, False otherwise + """ + return self._handle_service_operation( + service_name, ServiceOperation.STOPPING, tmux_stop_service, require_config=False + ) diff --git a/bec_server/bec_server/bec_server_utils/tmux_launch.py b/bec_server/bec_server/bec_server_utils/tmux_launch.py index 170626453..b121e6af3 100644 --- a/bec_server/bec_server/bec_server_utils/tmux_launch.py +++ b/bec_server/bec_server/bec_server_utils/tmux_launch.py @@ -6,7 +6,7 @@ from libtmux.exc import LibTmuxException -def activate_venv(pane, service_name, service_path): +def activate_venv(pane: libtmux.Pane, service_name: str, service_path: str) -> None: """ Activate the python environment for a service. """ @@ -31,7 +31,17 @@ def activate_venv(pane, service_name, service_path): pane.send_keys(f"conda activate {os.path.basename(os.environ['CONDA_PREFIX'])}") -def get_new_session(tmux_session_name, window_label): +def get_new_session(tmux_session_name: str, window_label: str) -> libtmux.Session | None: + """ + Create a new tmux session with the given name and window label. + + Args: + tmux_session_name (str): Name of the tmux session + window_label (str): Label for the tmux window + + Returns: + libtmux.Session | None: The created tmux session object + """ if os.environ.get("INVOCATION_ID"): # running within systemd os.makedirs("/tmp/tmux-shared", exist_ok=True) @@ -63,7 +73,7 @@ def get_new_session(tmux_session_name, window_label): return session -def tmux_start(bec_path: str, services: dict): +def tmux_start(bec_path: str, services: dict) -> None: """ Launch services in a tmux session. All services are launched in separate panes. Services config dict contains "tmux_session_name" (default: "bec") and "window_label" (default: "BEC server", @@ -80,12 +90,22 @@ def tmux_start(bec_path: str, services: dict): if tmux_session_name not in sessions: tmux_window_label = service_config.tmux_session.window_label session = get_new_session(tmux_session_name, tmux_window_label) - pane = session.attached_window.attached_pane + if session is None: + raise RuntimeError(f"Failed to create tmux session '{tmux_session_name}'") + pane = session.attached_window.active_pane sessions[tmux_session_name] = session else: session = sessions[tmux_session_name] pane = session.attached_window.split_window(vertical=False) + if pane is None: + raise RuntimeError(f"Failed to create pane for service '{service}'") + + # Set pane title to service name for easy identification + session.attached_window.set_window_option("pane-border-status", "top") + session.attached_window.set_window_option("pane-border-format", "#{pane_title}") + pane.cmd("select-pane", "-T", service) + activate_venv( pane, service_name=service, @@ -104,45 +124,63 @@ def tmux_start(bec_path: str, services: dict): session.set_option("mouse", "on") -def tmux_stop(session_name="bec", timeout=5): +def _get_tmux_server() -> libtmux.Server: + """Get tmux server instance, using shared socket if available.""" + if os.path.exists("/tmp/tmux-shared/default"): + return libtmux.Server(socket_path="/tmp/tmux-shared/default") + return libtmux.Server() + + +def _find_pane_by_title(session: libtmux.Session, service_name: str) -> libtmux.Pane | None: """ - Stop the services from the given tmux session. + Find a pane in a session by its title (service name). - 1. Send Ctrl+C (SIGINT) to all panes. - 2. Wait up to `timeout` seconds for processes to exit. - 3. Kill remaining processes if not exited. - 4. Kill the tmux session. + Args: + session (libtmux.Session): Tmux session object + service_name (str): Name of the service to find + + Returns: + libtmux.Pane | None: Pane object if found, None otherwise """ - # connect to tmux server - if os.path.exists("/tmp/tmux-shared/default"): - tmux_server = libtmux.Server(socket_path="/tmp/tmux-shared/default") - else: - tmux_server = libtmux.Server() + for pane in session.panes: + pane_title = pane.display_message("#{pane_title}", get_text=True) + # display_message returns a list when get_text=True + if isinstance(pane_title, list): + pane_title = pane_title[0] if pane_title else "" + if pane_title and pane_title.strip() == service_name: + return pane + return None - avail_sessions = tmux_server.sessions.filter(session_name=session_name) - if not avail_sessions: - return - session = avail_sessions[0] +def _stop_pane_processes(pane: libtmux.Pane, timeout: int = 5) -> list[psutil.Process]: + """ + Stop processes in a pane gracefully, then forcefully if needed. - # collect all child PIDs for panes - all_children = [] - for bash_pid in map(int, [p.pane_pid for p in session.panes]): - try: - parent_proc = psutil.Process(bash_pid) - children = parent_proc.children(recursive=True) - all_children.extend(children) - except psutil.NoSuchProcess: - continue + Args: + pane(libtmux.Pane): Tmux pane object + timeout (int): Timeout in seconds for waiting for process to exit - # Send Ctrl+C to each pane - for pane in session.panes: - pane.send_keys("^C") # sends SIGINT via tmux + Returns: + list[psutil.Process]: List of child processes that were found + """ + # Get child processes before stopping + try: + pane_pid = pane.pane_pid + if pane_pid is None: + return [] + bash_pid = int(pane_pid) + parent_proc = psutil.Process(bash_pid) + children = parent_proc.children(recursive=True) + except psutil.NoSuchProcess: + children = [] + + # Stop the service + pane.send_keys("^C") # Wait for processes to exit start_time = time.time() while time.time() - start_time < timeout: - alive = [p for p in all_children if p.is_running()] + alive = [p for p in children if p.is_running()] if not alive: break time.sleep(0.1) @@ -154,9 +192,133 @@ def tmux_stop(session_name="bec", timeout=5): except psutil.NoSuchProcess: pass + return children + + +def tmux_stop(session_name: str = "bec", timeout: int = 5) -> None: + """ + Stop the services from the given tmux session. + + 1. Send Ctrl+C (SIGINT) to all panes. + 2. Wait up to `timeout` seconds for processes to exit. + 3. Kill remaining processes if not exited. + 4. Kill the tmux session. + + Args: + session_name (str): Name of the tmux session (default: "bec") + timeout (int): Timeout in seconds for waiting for processes to exit (default: 5) + """ + tmux_server = _get_tmux_server() + + avail_sessions = tmux_server.sessions.filter(session_name=session_name) + if not avail_sessions: + return + + session = avail_sessions[0] + + # Stop all panes + for pane in session.panes: + _stop_pane_processes(pane, timeout) + # Kill tmux session try: session.kill_session() except LibTmuxException: # session may already exit itself if all panes are gone pass + + +def tmux_stop_service(service_name: str, session_name: str = "bec", timeout: int = 5) -> bool: + """ + Stop a single service in a specific pane within a tmux session. + + Args: + service_name (str): Name of the service to stop + session_name (str): Name of the tmux session (default: "bec") + timeout (int): Timeout in seconds for waiting for process to exit (default: 5) + + Returns: + bool: True if service was stopped successfully, False otherwise + """ + tmux_server = _get_tmux_server() + + avail_sessions = tmux_server.sessions.filter(session_name=session_name) + if not avail_sessions: + return False + + session = avail_sessions[0] + target_pane = _find_pane_by_title(session, service_name) + + if not target_pane: + return False + + # Stop the service processes + _stop_pane_processes(target_pane, timeout) + return True + + +def tmux_start_service( + bec_path: str, service_name: str, service_config, session_name: str = "bec" +) -> bool: + """ + Start a single service in an existing tmux session. + + Args: + bec_path (str): Path to the BEC source code + service_name (str): Name of the service to start + service_config: ServiceDesc object containing service configuration + session_name (str): Name of the tmux session (default: "bec") + + Returns: + bool: True if service was started successfully, False otherwise + """ + tmux_server = _get_tmux_server() + + avail_sessions = tmux_server.sessions.filter(session_name=session_name) + if not avail_sessions: + return False + + session = avail_sessions[0] + target_pane = _find_pane_by_title(session, service_name) + + if not target_pane: + return False + + # Start the service + activate_venv( + target_pane, + service_name=service_name, + service_path=service_config.path.substitute(base_path=bec_path), + ) + target_pane.send_keys(service_config.command) + + # Wait if a wait function is provided + wait_func = service_config.wait_func + if callable(wait_func): + wait_func() + + return True + + +def tmux_restart_service( + bec_path: str, service_name: str, service_config, session_name: str = "bec", timeout: int = 5 +) -> bool: + """ + Restart a single service in a specific pane within a tmux session. + + Args: + bec_path (str): Path to the BEC source code + service_name (str): Name of the service to restart + service_config: ServiceDesc object containing service configuration + session_name (str): Name of the tmux session (default: "bec") + timeout (int): Timeout in seconds for waiting for process to exit (default: 5) + + Returns: + bool: True if service was restarted successfully, False otherwise + """ + # Stop the service + if not tmux_stop_service(service_name, session_name, timeout): + return False + + # Start the service + return tmux_start_service(bec_path, service_name, service_config, session_name) diff --git a/bec_server/bec_server/scihub/service_handler/service_handler.py b/bec_server/bec_server/scihub/service_handler/service_handler.py index 605e8f9b5..781808cd1 100644 --- a/bec_server/bec_server/scihub/service_handler/service_handler.py +++ b/bec_server/bec_server/scihub/service_handler/service_handler.py @@ -16,25 +16,62 @@ class ServiceHandler: + """ + Handler for service management requests received through Redis messages. + + This handler listens for ServiceRequestMessage messages and executes + the requested actions (e.g., restarting services). + + Args: + connector: RedisConnector instance for message communication + """ + def __init__(self, connector: RedisConnector) -> None: self.connector = connector self.command = f"{sys.executable} -m bec_server.bec_server_utils.launch" def start(self): + """Register the service request message handler.""" self.connector.register( MessageEndpoints.service_request(), cb=self.handle_service_request, parent=self ) @staticmethod def handle_service_request(msg: MessageObject, parent: ServiceHandler) -> None: - message: messages.ServiceRequestMessage = msg.value + """ + Handle incoming service request messages. + + Args: + msg: Message object containing the ServiceRequestMessage + parent: Parent ServiceHandler instance + """ + message = msg.value + if not isinstance(message, messages.ServiceRequestMessage): + return if message.action == "restart": - parent.on_restart() + parent.on_restart(service_name=message.service_name) + + def on_restart(self, service_name: str | None = None): + """ + Restart BEC services. + + Launches a subprocess to restart either all services or a specific service. + The subprocess runs independently and the method returns immediately. + + Args: + service_name: Name of the specific service to restart (e.g., "scan_server", + "device_server"). If None, all services will be restarted. + """ + if service_name: + logger.info(f"Restarting service '{service_name}' through service handler") + command = f"{self.command} restart --service {service_name}" + else: + logger.info("Restarting all services through service handler") + command = f"{self.command} restart" - def on_restart(self): - logger.info("Restarting services through service handler") + # pylint: disable=subprocess-popen-preexec-fn subprocess.Popen( - f"{self.command} restart", + command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, diff --git a/bec_server/tests/tests_bec_server_utils/test_service_handler.py b/bec_server/tests/tests_bec_server_utils/test_cli_service_handler.py similarity index 52% rename from bec_server/tests/tests_bec_server_utils/test_service_handler.py rename to bec_server/tests/tests_bec_server_utils/test_cli_service_handler.py index 0d3287a0a..531603f7d 100644 --- a/bec_server/tests/tests_bec_server_utils/test_service_handler.py +++ b/bec_server/tests/tests_bec_server_utils/test_cli_service_handler.py @@ -64,3 +64,66 @@ def test_service_handler_services(): ) assert service_handler.SERVICES["scan_server"].command == "bec-scan-server" + + +def test_service_handler_restart_service_success(): + """Test successfully restarting a single service""" + bec_path = "/path/to/bec" + config_path = "/path/to/config" + + with mock.patch("bec_server.bec_server_utils.service_handler.sys") as mock_sys: + mock_sys.platform = "linux" + service_handler = ServiceHandler(bec_path, config_path) + + with mock.patch( + "bec_server.bec_server_utils.service_handler.tmux_restart_service" + ) as mock_tmux_restart: + mock_tmux_restart.return_value = True + + result = service_handler.restart_service("scan_server") + + assert result is True + mock_tmux_restart.assert_called_once() + # Verify the service config includes the config path + call_args = mock_tmux_restart.call_args + assert call_args[0][0] == bec_path + assert call_args[0][1] == "scan_server" + assert f"--config {config_path}" in call_args[0][2].command + + +def test_service_handler_restart_service_unknown_service(): + """Test restarting an unknown service""" + service_handler = ServiceHandler("/path/to/bec") + + result = service_handler.restart_service("unknown_service") + + assert result is False + + +def test_service_handler_restart_service_not_found(): + """Test restarting a service that is not running""" + bec_path = "/path/to/bec" + + with mock.patch("bec_server.bec_server_utils.service_handler.sys") as mock_sys: + mock_sys.platform = "linux" + service_handler = ServiceHandler(bec_path) + + with mock.patch( + "bec_server.bec_server_utils.service_handler.tmux_restart_service" + ) as mock_tmux_restart: + mock_tmux_restart.return_value = False + + result = service_handler.restart_service("scan_server") + + assert result is False + + +def test_service_handler_restart_service_unsupported_interface(): + """Test restarting a service with systemctl interface""" + bec_path = "/path/to/bec" + + service_handler = ServiceHandler(bec_path, interface="systemctl") + + result = service_handler.restart_service("scan_server") + + assert result is False diff --git a/bec_server/tests/tests_bec_server_utils/test_main.py b/bec_server/tests/tests_bec_server_utils/test_main.py index fa3e09778..f2e591c34 100644 --- a/bec_server/tests/tests_bec_server_utils/test_main.py +++ b/bec_server/tests/tests_bec_server_utils/test_main.py @@ -7,7 +7,7 @@ def test_main_start(): with mock.patch("bec_server.bec_server_utils.launch.ServiceHandler") as mock_service_handler: with mock.patch("bec_server.bec_server_utils.launch.argparse") as mock_argparse: mock_argparse.ArgumentParser().parse_args.return_value = mock.MagicMock( - command="start", config=None, interface="tmux" + command="start", config=None, interface="tmux", service=None ) main() mock_service_handler.assert_called_once_with( @@ -24,7 +24,7 @@ def test_main_stop(): with mock.patch("bec_server.bec_server_utils.launch.ServiceHandler") as mock_service_handler: with mock.patch("bec_server.bec_server_utils.launch.argparse") as mock_argparse: mock_argparse.ArgumentParser().parse_args.return_value = mock.MagicMock( - command="stop", config=None, interface="tmux" + command="stop", config=None, interface="tmux", service=None ) main() mock_service_handler.assert_called_once_with( @@ -41,7 +41,7 @@ def test_main_restart(): with mock.patch("bec_server.bec_server_utils.launch.ServiceHandler") as mock_service_handler: with mock.patch("bec_server.bec_server_utils.launch.argparse") as mock_argparse: mock_argparse.ArgumentParser().parse_args.return_value = mock.MagicMock( - command="restart", config=None, interface="tmux" + command="restart", config=None, interface="tmux", service=None ) main() mock_service_handler.assert_called_once_with( @@ -58,7 +58,7 @@ def test_main_start_with_config(): with mock.patch("bec_server.bec_server_utils.launch.ServiceHandler") as mock_service_handler: with mock.patch("bec_server.bec_server_utils.launch.argparse") as mock_argparse: mock_argparse.ArgumentParser().parse_args.return_value = mock.MagicMock( - command="start", config="/path/to/config", interface="tmux" + command="start", config="/path/to/config", interface="tmux", service=None ) main() mock_service_handler.assert_called_once_with( @@ -75,7 +75,7 @@ def test_main_restart_with_config(): with mock.patch("bec_server.bec_server_utils.launch.ServiceHandler") as mock_service_handler: with mock.patch("bec_server.bec_server_utils.launch.argparse") as mock_argparse: mock_argparse.ArgumentParser().parse_args.return_value = mock.MagicMock( - command="restart", config="/path/to/config", interface="tmux" + command="restart", config="/path/to/config", interface="tmux", service=None ) main() mock_service_handler.assert_called_once_with( @@ -86,3 +86,20 @@ def test_main_restart_with_config(): no_persistence=False, ) mock_service_handler().restart.assert_called_once() + + +def test_main_restart_single_service(): + with mock.patch("bec_server.bec_server_utils.launch.ServiceHandler") as mock_service_handler: + with mock.patch("bec_server.bec_server_utils.launch.argparse") as mock_argparse: + mock_argparse.ArgumentParser().parse_args.return_value = mock.MagicMock( + command="restart", config=None, interface="tmux", service="scan_server" + ) + main() + mock_service_handler.assert_called_once_with( + bec_path=mock.ANY, + config_path=None, + interface="tmux", + start_redis=False, + no_persistence=False, + ) + mock_service_handler().restart_service.assert_called_once_with("scan_server") diff --git a/bec_server/tests/tests_bec_server_utils/test_tmux_launch.py b/bec_server/tests/tests_bec_server_utils/test_tmux_launch.py index 0a7b1de34..27b93e42c 100644 --- a/bec_server/tests/tests_bec_server_utils/test_tmux_launch.py +++ b/bec_server/tests/tests_bec_server_utils/test_tmux_launch.py @@ -2,7 +2,14 @@ from unittest import mock from bec_server.bec_server_utils.service_handler import ServiceDesc -from bec_server.bec_server_utils.tmux_launch import tmux_start, tmux_stop +from bec_server.bec_server_utils.tmux_launch import ( + activate_venv, + tmux_restart_service, + tmux_start, + tmux_start_service, + tmux_stop, + tmux_stop_service, +) def test_tmux_start(): @@ -41,3 +48,169 @@ def test_tmux_stop_with_sessions(): mock_libtmux_server.Server().sessions.filter.return_value = [session] tmux_stop() session.kill_session.assert_called_once() + + +def test_tmux_restart_service_success(): + """Test successfully restarting a service""" + service_config = ServiceDesc(Template("$base_path/scan_server"), "bec-scan-server") + + with ( + mock.patch("bec_server.bec_server_utils.tmux_launch.libtmux") as mock_libtmux, + mock.patch("bec_server.bec_server_utils.tmux_launch.psutil") as mock_psutil, + mock.patch("bec_server.bec_server_utils.tmux_launch.activate_venv") as mock_activate_venv, + ): + + # Setup mock session and pane + mock_session = mock.MagicMock() + mock_pane = mock.MagicMock() + mock_pane.pane_pid = "12345" + # Mock display_message to return the service name as pane title + mock_pane.display_message.return_value = ["scan_server"] + mock_session.panes = [mock_pane] + mock_libtmux.Server().sessions.filter.return_value = [mock_session] + + # Setup mock process + mock_process = mock.MagicMock() + mock_child = mock.MagicMock() + mock_child.is_running.return_value = False + mock_process.children.return_value = [mock_child] + mock_psutil.Process.return_value = mock_process + + # Call the function + result = tmux_restart_service("/path/to/bec", "scan_server", service_config) + + # Assertions + assert result is True + mock_pane.display_message.assert_called_with("#{pane_title}", get_text=True) + mock_pane.send_keys.assert_any_call("^C") + mock_pane.send_keys.assert_any_call("bec-scan-server") + mock_activate_venv.assert_called_once() + + +def test_tmux_restart_service_not_found(): + """Test restarting a service that is not running""" + service_config = ServiceDesc(Template("$base_path/scan_server"), "bec-scan-server") + + with mock.patch("bec_server.bec_server_utils.tmux_launch.libtmux") as mock_libtmux: + + # Setup mock session with pane running different service + mock_session = mock.MagicMock() + mock_pane = mock.MagicMock() + # Mock display_message to return a different service name + mock_pane.display_message.return_value = ["device_server"] + mock_session.panes = [mock_pane] + mock_libtmux.Server().sessions.filter.return_value = [mock_session] + + # Call the function + result = tmux_restart_service("/path/to/bec", "scan_server", service_config) + + # Assertions + assert result is False + + +def test_tmux_restart_service_no_session(): + """Test restarting a service when no session exists""" + service_config = ServiceDesc(Template("$base_path/scan_server"), "bec-scan-server") + + with mock.patch("bec_server.bec_server_utils.tmux_launch.libtmux") as mock_libtmux: + mock_libtmux.Server().sessions.filter.return_value = [] + + result = tmux_restart_service("/path/to/bec", "scan_server", service_config) + + assert result is False + + +def test_tmux_stop_service_success(): + """Test successfully stopping a single service""" + with ( + mock.patch("bec_server.bec_server_utils.tmux_launch.libtmux") as mock_libtmux, + mock.patch("bec_server.bec_server_utils.tmux_launch.psutil") as mock_psutil, + ): + # Setup mock session and pane + mock_session = mock.MagicMock() + mock_pane = mock.MagicMock() + mock_pane.pane_pid = "12345" + mock_pane.display_message.return_value = ["scan_server"] + mock_session.panes = [mock_pane] + mock_libtmux.Server().sessions.filter.return_value = [mock_session] + + # Setup mock process + mock_process = mock.MagicMock() + mock_child = mock.MagicMock() + mock_child.is_running.return_value = False + mock_process.children.return_value = [mock_child] + mock_psutil.Process.return_value = mock_process + + # Call the function + result = tmux_stop_service("scan_server") + + # Assertions + assert result is True + mock_pane.send_keys.assert_called_with("^C") + + +def test_tmux_start_service_success(): + """Test successfully starting a single service""" + service_config = ServiceDesc(Template("$base_path/scan_server"), "bec-scan-server") + + with ( + mock.patch("bec_server.bec_server_utils.tmux_launch.libtmux") as mock_libtmux, + mock.patch("bec_server.bec_server_utils.tmux_launch.activate_venv") as mock_activate_venv, + ): + # Setup mock session and pane + mock_session = mock.MagicMock() + mock_pane = mock.MagicMock() + mock_pane.display_message.return_value = ["scan_server"] + mock_session.panes = [mock_pane] + mock_libtmux.Server().sessions.filter.return_value = [mock_session] + + # Call the function + result = tmux_start_service("/path/to/bec", "scan_server", service_config) + + # Assertions + assert result is True + mock_activate_venv.assert_called_once_with( + mock_pane, service_name="scan_server", service_path="/path/to/bec/scan_server" + ) + mock_pane.send_keys.assert_called_with("bec-scan-server") + + +def test_activate_venv_with_service_venv(): + """Test activate_venv when service-specific venv exists""" + mock_pane = mock.MagicMock() + + with ( + mock.patch("bec_server.bec_server_utils.tmux_launch.os.path.exists") as mock_exists, + mock.patch("bec_server.bec_server_utils.tmux_launch.os.getenv") as mock_getenv, + ): + # Mock that service_venv exists + mock_exists.side_effect = lambda path: "scan_server_venv" in path + mock_getenv.return_value = None + + activate_venv(mock_pane, "scan_server", "/path/to/bec/scan_server") + + mock_pane.send_keys.assert_called_once_with( + "source /path/to/bec/scan_server/scan_server_venv/bin/activate" + ) + + +def test_activate_venv_with_conda(): + """Test activate_venv when running in Conda environment""" + mock_pane = mock.MagicMock() + + with ( + mock.patch("bec_server.bec_server_utils.tmux_launch.os.path.exists") as mock_exists, + mock.patch("bec_server.bec_server_utils.tmux_launch.os.getenv") as mock_getenv, + mock.patch( + "bec_server.bec_server_utils.tmux_launch.os.environ", + {"CONDA_PREFIX": "/opt/conda/envs/bec-env"}, + ), + ): + # No venv exists + mock_exists.return_value = False + # CONDA_PREFIX is set + mock_getenv.return_value = "/opt/conda/envs/bec-env" + + activate_venv(mock_pane, "scan_server", "/path/to/bec/scan_server") + + mock_pane.send_keys.assert_called_once_with("conda activate bec-env") diff --git a/bec_server/tests/tests_scihub/test_service_handler.py b/bec_server/tests/tests_scihub/test_service_handler.py new file mode 100644 index 000000000..94a1dafcc --- /dev/null +++ b/bec_server/tests/tests_scihub/test_service_handler.py @@ -0,0 +1,109 @@ +from unittest import mock + +from bec_lib import messages +from bec_server.scihub.service_handler.service_handler import ServiceHandler + + +def test_service_handler_initialization(): + """Test ServiceHandler initialization""" + mock_connector = mock.MagicMock() + handler = ServiceHandler(mock_connector) + + assert handler.connector == mock_connector + assert "python" in handler.command + assert "bec_server.bec_server_utils.launch" in handler.command + + +def test_service_handler_start(): + """Test ServiceHandler registers message handler""" + mock_connector = mock.MagicMock() + handler = ServiceHandler(mock_connector) + + handler.start() + + mock_connector.register.assert_called_once() + call_args = mock_connector.register.call_args + assert call_args[1]["cb"] == handler.handle_service_request + assert call_args[1]["parent"] == handler + + +def test_handle_service_request_restart_all(): + """Test handling restart request for all services""" + mock_connector = mock.MagicMock() + handler = ServiceHandler(mock_connector) + + # Create mock message for restarting all services + mock_msg = mock.MagicMock() + mock_msg.value = messages.ServiceRequestMessage(action="restart") + + with mock.patch.object(handler, "on_restart") as mock_on_restart: + ServiceHandler.handle_service_request(mock_msg, handler) + mock_on_restart.assert_called_once_with(service_name=None) + + +def test_handle_service_request_restart_single_service(): + """Test handling restart request for a single service""" + mock_connector = mock.MagicMock() + handler = ServiceHandler(mock_connector) + + # Create mock message for restarting a specific service + mock_msg = mock.MagicMock() + mock_msg.value = messages.ServiceRequestMessage(action="restart", service_name="scan_server") + + with mock.patch.object(handler, "on_restart") as mock_on_restart: + ServiceHandler.handle_service_request(mock_msg, handler) + mock_on_restart.assert_called_once_with(service_name="scan_server") + + +def test_on_restart_all_services(): + """Test restarting all services""" + mock_connector = mock.MagicMock() + handler = ServiceHandler(mock_connector) + + with mock.patch( + "bec_server.scihub.service_handler.service_handler.subprocess.Popen" + ) as mock_popen: + handler.on_restart(service_name=None) + + mock_popen.assert_called_once() + call_args = mock_popen.call_args + command = call_args[0][0] + + assert "restart" in command + assert "--service" not in command + + +def test_on_restart_single_service(): + """Test restarting a single service""" + mock_connector = mock.MagicMock() + handler = ServiceHandler(mock_connector) + + with mock.patch( + "bec_server.scihub.service_handler.service_handler.subprocess.Popen" + ) as mock_popen: + handler.on_restart(service_name="scan_server") + + mock_popen.assert_called_once() + call_args = mock_popen.call_args + command = call_args[0][0] + + assert "restart" in command + assert "--service scan_server" in command + + +def test_on_restart_subprocess_detached(): + """Test that subprocess is launched detached""" + mock_connector = mock.MagicMock() + handler = ServiceHandler(mock_connector) + + with mock.patch( + "bec_server.scihub.service_handler.service_handler.subprocess.Popen" + ) as mock_popen: + handler.on_restart(service_name="device_server") + + call_kwargs = mock_popen.call_args[1] + assert call_kwargs["shell"] is True + assert call_kwargs["stdout"] == mock.ANY + assert call_kwargs["stderr"] == mock.ANY + assert call_kwargs["stdin"] == mock.ANY + assert "preexec_fn" in call_kwargs From c202d1a1d89097176ac611412583489d57b491d2 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Wed, 19 Nov 2025 17:34:48 +0100 Subject: [PATCH 2/3] f - wip --- .../cli}/__init__.py | 0 .../bec_server/bec_server_utils/cli/launch.py | 32 +++++ .../bec_server/bec_server_utils/launch.py | 23 +++- .../bec_server_utils/service_handler.py | 56 +++++---- .../bec_server_utils/supervisor_service.py | 104 +++++++++++++++++ .../bec_server_utils/tmux_launch.py | 37 ++++-- bec_server/bec_server/scihub/scihub.py | 6 - .../scihub/service_handler/service_handler.py | 80 ------------- bec_server/pyproject.toml | 1 + .../tests_scihub/test_service_handler.py | 109 ------------------ 10 files changed, 218 insertions(+), 230 deletions(-) rename bec_server/bec_server/{scihub/service_handler => bec_server_utils/cli}/__init__.py (100%) create mode 100644 bec_server/bec_server/bec_server_utils/cli/launch.py create mode 100644 bec_server/bec_server/bec_server_utils/supervisor_service.py delete mode 100644 bec_server/bec_server/scihub/service_handler/service_handler.py delete mode 100644 bec_server/tests/tests_scihub/test_service_handler.py diff --git a/bec_server/bec_server/scihub/service_handler/__init__.py b/bec_server/bec_server/bec_server_utils/cli/__init__.py similarity index 100% rename from bec_server/bec_server/scihub/service_handler/__init__.py rename to bec_server/bec_server/bec_server_utils/cli/__init__.py diff --git a/bec_server/bec_server/bec_server_utils/cli/launch.py b/bec_server/bec_server/bec_server_utils/cli/launch.py new file mode 100644 index 000000000..be04215d5 --- /dev/null +++ b/bec_server/bec_server/bec_server_utils/cli/launch.py @@ -0,0 +1,32 @@ +# Description: Launch the Supervisor service. +# This script is the entry point for the Supervisor service. It is called either +# by the bec-supervisor entry point or directly from the command line. +import threading + +from bec_lib.bec_service import parse_cmdline_args +from bec_lib.logger import bec_logger +from bec_lib.redis_connector import RedisConnector +from bec_server.bec_server_utils.supervisor_service import SupervisorService + +logger = bec_logger.logger +bec_logger.level = bec_logger.LOGLEVEL.INFO + + +def main(): + """ + Launch the Supervisor service. + """ + _, _, config = parse_cmdline_args() + + supervisor = SupervisorService(config, RedisConnector) + + try: + event = threading.Event() + logger.success("Started Supervisor service") + event.wait() + except KeyboardInterrupt: + supervisor.shutdown() + + +if __name__ == "__main__": + main() diff --git a/bec_server/bec_server/bec_server_utils/launch.py b/bec_server/bec_server/bec_server_utils/launch.py index acfff86fb..e3409af4d 100644 --- a/bec_server/bec_server/bec_server_utils/launch.py +++ b/bec_server/bec_server/bec_server_utils/launch.py @@ -42,6 +42,13 @@ def main(): default=None, help="Stop a specific service only (e.g., scan_server, device_server)", ) + stop.add_argument( + "--skip-service", + type=str, + action="append", + default=None, + help="Skip stopping this service (can be used multiple times)", + ) restart = command.add_parser("restart", help="Restart the BEC server") restart.add_argument( "--config", type=str, default=None, help="Path to the BEC service config file" @@ -58,6 +65,13 @@ def main(): default=None, help="Restart a specific service only (e.g., scan_server, device_server)", ) + restart.add_argument( + "--skip-service", + type=str, + action="append", + default=None, + help="Skip restarting this service (can be used multiple times)", + ) command.add_parser("attach", help="Open the currently running BEC server session") args = parser.parse_args() @@ -82,15 +96,17 @@ def main(): else: service_handler.start() elif args.command == "stop": + skip_services = getattr(args, "skip_service", None) or [] if hasattr(args, "service") and args.service: service_handler.stop_service(args.service) else: - service_handler.stop() + service_handler.stop(skip_services=skip_services) elif args.command == "restart": + skip_services = getattr(args, "skip_service", None) or [] if hasattr(args, "service") and args.service: service_handler.restart_service(args.service) else: - service_handler.restart() + service_handler.restart(skip_services=skip_services) elif args.command == "attach": if os.path.exists("/tmp/tmux-shared/default"): # if we have a shared socket, use it @@ -105,7 +121,4 @@ def main(): if __name__ == "__main__": - import sys - - sys.argv = ["bec-server", "start"] main() diff --git a/bec_server/bec_server/bec_server_utils/service_handler.py b/bec_server/bec_server/bec_server_utils/service_handler.py index 121caf563..a6a9d3240 100644 --- a/bec_server/bec_server/bec_server_utils/service_handler.py +++ b/bec_server/bec_server/bec_server_utils/service_handler.py @@ -12,6 +12,7 @@ import redis from bec_lib.service_config import ServiceConfig +from bec_lib.utils.rpc_utils import rgetattr from bec_server.bec_server_utils.subprocess_launch import subprocess_start, subprocess_stop from bec_server.bec_server_utils.tmux_launch import ( tmux_restart_service, @@ -58,14 +59,16 @@ class ServiceDesc: command: str tmux_session: TmuxSession = field(default_factory=TmuxSession) wait_func: Union[Callable, None] = None + separate_window: bool = False def __eq__(self, other): - if isinstance(other, ServiceDesc): - if other.path.template == self.path.template and self.command == other.command: - if self.tmux_session.name == other.tmux_session.name: - if self.wait_func == other.wait_func: - return True - return False + if not isinstance(other, ServiceDesc): + return False + comp = ["path.template", "command", "tmux_session.name", "wait_func", "separate_window"] + for attr in comp: + if rgetattr(other, attr) != rgetattr(self, attr): + return False + return True class ServiceHandler: @@ -81,6 +84,9 @@ class ServiceHandler: "file_writer": ServiceDesc(Template("$base_path/file_writer"), "bec-file-writer"), "scihub": ServiceDesc(Template("$base_path/scihub"), "bec-scihub"), "data_processing": ServiceDesc(Template("$base_path/data_processing"), "bec-dap"), + "supervisor": ServiceDesc( + Template("$base_path/bec_server_utils"), "bec-supervisor", separate_window=True + ), } def __init__( @@ -201,14 +207,27 @@ def wait_ready(redis_host_port, key=None): f"Unsupported interface: {self.interface}. Supported interfaces are: tmux, iterm2, systemctl, subprocess" ) - def stop(self, processes=None): + def stop(self, processes=None, skip_services: list[str] | None = None): """ Stop the BEC server using the available interface. + + Args: + processes: List of processes to stop (for subprocess interface) + skip_services: List of service names to skip stopping """ + skip_services = skip_services or [] print("Stopping BEC server...") + if skip_services: + print(f"Skipping services: {', '.join(skip_services)}") if self.interface == "tmux": tmux_stop("bec-redis") - tmux_stop("bec") + if skip_services: + # Stop individual services except the ones to skip + for service_name in self.SERVICES.keys(): + if service_name not in skip_services: + tmux_stop_service(service_name, "bec") + else: + tmux_stop("bec") elif self.interface == "iterm2": pass elif self.interface == "systemctl": @@ -220,12 +239,15 @@ def stop(self, processes=None): f"Unsupported interface: {self.interface}. Supported interfaces are: tmux, iterm2, systemctl, subprocess" ) - def restart(self): + def restart(self, skip_services: list[str] | None = None): """ Restart the BEC server using the available interface. + + Args: + skip_services: List of service names to skip restarting """ print("Restarting BEC server...") - self.stop() + self.stop(skip_services=skip_services) self.start() def _validate_service(self, service_name: str) -> tuple[bool, ServiceDesc | None]: @@ -289,21 +311,9 @@ def _handle_service_operation( f"{bcolors.FAIL}Failed to {operation.value.lower()} {service_name}. Service not found in tmux session.{bcolors.ENDC}" ) return success - elif self.interface == "systemctl" and operation == ServiceOperation.RESTARTING: - print( - f"{bcolors.WARNING}Warning: {operation.value} individual services is not supported with systemctl. Please {operation.value.lower()} the entire BEC server.{bcolors.ENDC}" - ) - return False - elif ( - self.interface in ("iterm2", "subprocess") and operation == ServiceOperation.RESTARTING - ): - print( - f"{bcolors.WARNING}Warning: {operation.value} individual services is not supported with {self.interface}. Please {operation.value.lower()} the entire BEC server.{bcolors.ENDC}" - ) - return False else: print( - f"{bcolors.WARNING}Warning: {operation.value} individual services is only supported with tmux interface.{bcolors.ENDC}" + f"{bcolors.FAIL}Error: Service operations are only supported with the 'tmux' interface.{bcolors.ENDC}" ) return False diff --git a/bec_server/bec_server/bec_server_utils/supervisor_service.py b/bec_server/bec_server/bec_server_utils/supervisor_service.py new file mode 100644 index 000000000..bea826c45 --- /dev/null +++ b/bec_server/bec_server/bec_server_utils/supervisor_service.py @@ -0,0 +1,104 @@ +""" +Supervisor service for managing BEC service restart requests. + +This service listens for ServiceRequestMessage messages and executes +the requested actions (e.g., restarting services) by invoking the +appropriate service handler commands. +""" + +from __future__ import annotations + +import os +import subprocess +import sys +from typing import TYPE_CHECKING + +from bec_lib import messages +from bec_lib.bec_service import BECService +from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger +from bec_lib.service_config import ServiceConfig + +if TYPE_CHECKING: # pragma: no cover + from bec_lib.redis_connector import MessageObject, RedisConnector + +logger = bec_logger.logger + + +class SupervisorService(BECService): + """ + Supervisor service for handling service management requests. + + This service listens for ServiceRequestMessage messages and executes + the requested actions (e.g., restarting services) by invoking the + launch utility. + + Args: + config: ServiceConfig instance for service configuration + connector_cls: RedisConnector class for message communication + """ + + def __init__(self, config: ServiceConfig, connector_cls: type[RedisConnector]) -> None: + super().__init__(config, connector_cls, unique_service=True) + self.config = config + self.command = f"{sys.executable} -m bec_server.bec_server_utils.launch" + self._register_handlers() + self.status = messages.BECStatus.RUNNING + + def _register_handlers(self): + """Register the service request message handler.""" + self.connector.register( + MessageEndpoints.service_request(), cb=self._handle_service_request, parent=self + ) + + @staticmethod + def _handle_service_request(msg: MessageObject, parent: SupervisorService) -> None: + """ + Handle incoming service request messages. + + Args: + msg: Message object containing the ServiceRequestMessage + parent: Parent SupervisorService instance + """ + message = msg.value + if not isinstance(message, messages.ServiceRequestMessage): + return + if message.action == "restart": + parent._on_restart(service_name=message.service_name) + + def _on_restart(self, service_name: str | None = None): + """ + Restart BEC services. + + Launches a subprocess to restart either all services or a specific service. + The subprocess runs independently and the method returns immediately. + When restarting all services, the supervisor service itself is skipped. + + Args: + service_name: Name of the specific service to restart (e.g., "scan_server", + "device_server"). If None, all services will be restarted. + """ + if service_name: + logger.info(f"Restarting service '{service_name}' through supervisor") + command = f"{self.command} restart --service {service_name} --interface tmux" + else: + logger.info("Restarting all services through supervisor") + command = f"{self.command} restart --skip-service supervisor" + + # pylint: disable=subprocess-popen-preexec-fn + logger.info(f"Executing command: {command}") + subprocess.Popen( + command, + shell=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + preexec_fn=os.setsid, + ) + + def shutdown(self): + """Shutdown the supervisor service.""" + self.connector.unregister( + MessageEndpoints.service_request(), cb=self._handle_service_request + ) + super().shutdown() diff --git a/bec_server/bec_server/bec_server_utils/tmux_launch.py b/bec_server/bec_server/bec_server_utils/tmux_launch.py index b121e6af3..773224dbd 100644 --- a/bec_server/bec_server/bec_server_utils/tmux_launch.py +++ b/bec_server/bec_server/bec_server_utils/tmux_launch.py @@ -1,10 +1,16 @@ +from __future__ import annotations + import os import time +from typing import TYPE_CHECKING import libtmux import psutil from libtmux.exc import LibTmuxException +if TYPE_CHECKING: + from bec_server.bec_server_utils.service_handler import ServiceDesc + def activate_venv(pane: libtmux.Pane, service_name: str, service_path: str) -> None: """ @@ -73,7 +79,7 @@ def get_new_session(tmux_session_name: str, window_label: str) -> libtmux.Sessio return session -def tmux_start(bec_path: str, services: dict) -> None: +def tmux_start(bec_path: str, services: dict[str, ServiceDesc]) -> None: """ Launch services in a tmux session. All services are launched in separate panes. Services config dict contains "tmux_session_name" (default: "bec") and "window_label" (default: "BEC server", @@ -81,12 +87,16 @@ def tmux_start(bec_path: str, services: dict) -> None: Args: bec_path (str): Path to the BEC source code - services (dict): Dictionary of services to launch. Keys are the service names, values are path and command templates. + services (dict[str, ServiceDesc]): Dictionary of services to launch. Keys are the service names, values are path and command templates. """ - sessions = {} + sessions: dict[str, libtmux.Session] = {} + session_windows: dict[str, libtmux.Window] = {} + for service, service_config in services.items(): tmux_session_name = service_config.tmux_session.name + separate_window = service_config.separate_window + if tmux_session_name not in sessions: tmux_window_label = service_config.tmux_session.window_label session = get_new_session(tmux_session_name, tmux_window_label) @@ -94,16 +104,26 @@ def tmux_start(bec_path: str, services: dict) -> None: raise RuntimeError(f"Failed to create tmux session '{tmux_session_name}'") pane = session.attached_window.active_pane sessions[tmux_session_name] = session + if not separate_window: + session_windows[tmux_session_name] = session.attached_window else: session = sessions[tmux_session_name] - pane = session.attached_window.split_window(vertical=False) + if separate_window: + # Create a new window for this service + window = session.new_window(window_name=service) + pane = window.active_pane + else: + # Split the current window to create a new pane + if tmux_session_name not in session_windows: + session_windows[tmux_session_name] = session.attached_window + pane = session_windows[tmux_session_name].split_window(vertical=False) if pane is None: raise RuntimeError(f"Failed to create pane for service '{service}'") # Set pane title to service name for easy identification - session.attached_window.set_window_option("pane-border-status", "top") - session.attached_window.set_window_option("pane-border-format", "#{pane_title}") + pane.window.set_window_option("pane-border-status", "top") + pane.window.set_window_option("pane-border-format", "#{pane_title}") pane.cmd("select-pane", "-T", service) activate_venv( @@ -118,8 +138,11 @@ def tmux_start(bec_path: str, services: dict) -> None: if callable(wait_func): wait_func() + # Apply tiled layout only to windows with multiple panes (not separate windows) + for window in session_windows.values(): + window.select_layout("tiled") + for session in sessions.values(): - session.attached_window.select_layout("tiled") session.mouse_all_flag = True session.set_option("mouse", "on") diff --git a/bec_server/bec_server/scihub/scihub.py b/bec_server/bec_server/scihub/scihub.py index 322d7c89c..f2279af23 100644 --- a/bec_server/bec_server/scihub/scihub.py +++ b/bec_server/bec_server/scihub/scihub.py @@ -7,7 +7,6 @@ from bec_lib.service_config import ServiceConfig from bec_server.scihub.atlas import AtlasConnector from bec_server.scihub.scilog import SciLogConnector -from bec_server.scihub.service_handler.service_handler import ServiceHandler if TYPE_CHECKING: from bec_lib.redis_connector import RedisConnector @@ -22,7 +21,6 @@ def __init__(self, config: ServiceConfig, connector_cls: type[RedisConnector]) - self.service_handler = None self._start_atlas_connector() self._start_scilog_connector() - self._start_service_handler() self.status = messages.BECStatus.RUNNING def _start_atlas_connector(self): @@ -33,10 +31,6 @@ def _start_atlas_connector(self): def _start_scilog_connector(self): self.scilog_connector = SciLogConnector(self, self.connector) - def _start_service_handler(self): - self.service_handler = ServiceHandler(self.connector) - self.service_handler.start() - def shutdown(self): super().shutdown() if self.atlas_connector: diff --git a/bec_server/bec_server/scihub/service_handler/service_handler.py b/bec_server/bec_server/scihub/service_handler/service_handler.py deleted file mode 100644 index 781808cd1..000000000 --- a/bec_server/bec_server/scihub/service_handler/service_handler.py +++ /dev/null @@ -1,80 +0,0 @@ -from __future__ import annotations - -import os -import subprocess -import sys -from typing import TYPE_CHECKING - -from bec_lib import messages -from bec_lib.endpoints import MessageEndpoints -from bec_lib.logger import bec_logger - -if TYPE_CHECKING: # pragma: no cover - from bec_lib.redis_connector import MessageObject, RedisConnector - -logger = bec_logger.logger - - -class ServiceHandler: - """ - Handler for service management requests received through Redis messages. - - This handler listens for ServiceRequestMessage messages and executes - the requested actions (e.g., restarting services). - - Args: - connector: RedisConnector instance for message communication - """ - - def __init__(self, connector: RedisConnector) -> None: - self.connector = connector - self.command = f"{sys.executable} -m bec_server.bec_server_utils.launch" - - def start(self): - """Register the service request message handler.""" - self.connector.register( - MessageEndpoints.service_request(), cb=self.handle_service_request, parent=self - ) - - @staticmethod - def handle_service_request(msg: MessageObject, parent: ServiceHandler) -> None: - """ - Handle incoming service request messages. - - Args: - msg: Message object containing the ServiceRequestMessage - parent: Parent ServiceHandler instance - """ - message = msg.value - if not isinstance(message, messages.ServiceRequestMessage): - return - if message.action == "restart": - parent.on_restart(service_name=message.service_name) - - def on_restart(self, service_name: str | None = None): - """ - Restart BEC services. - - Launches a subprocess to restart either all services or a specific service. - The subprocess runs independently and the method returns immediately. - - Args: - service_name: Name of the specific service to restart (e.g., "scan_server", - "device_server"). If None, all services will be restarted. - """ - if service_name: - logger.info(f"Restarting service '{service_name}' through service handler") - command = f"{self.command} restart --service {service_name}" - else: - logger.info("Restarting all services through service handler") - command = f"{self.command} restart" - - # pylint: disable=subprocess-popen-preexec-fn - subprocess.Popen( - command, - shell=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - stdin=subprocess.DEVNULL, - preexec_fn=os.setsid, - ) diff --git a/bec_server/pyproject.toml b/bec_server/pyproject.toml index 90cd34858..62a1f27bf 100644 --- a/bec_server/pyproject.toml +++ b/bec_server/pyproject.toml @@ -48,6 +48,7 @@ bec-file-writer = "bec_server.file_writer.cli.launch:main" bec-scan-server = "bec_server.scan_server.cli.launch:main" bec-scan-bundler = "bec_server.scan_bundler.cli.launch:main" bec-scihub = "bec_server.scihub.cli.launch:main" +bec-supervisor = "bec_server.bec_server_utils.cli.launch:main" bec-server = "bec_server:main" bec-procedure-worker = "bec_server.scan_server.procedures.container_worker:main" diff --git a/bec_server/tests/tests_scihub/test_service_handler.py b/bec_server/tests/tests_scihub/test_service_handler.py deleted file mode 100644 index 94a1dafcc..000000000 --- a/bec_server/tests/tests_scihub/test_service_handler.py +++ /dev/null @@ -1,109 +0,0 @@ -from unittest import mock - -from bec_lib import messages -from bec_server.scihub.service_handler.service_handler import ServiceHandler - - -def test_service_handler_initialization(): - """Test ServiceHandler initialization""" - mock_connector = mock.MagicMock() - handler = ServiceHandler(mock_connector) - - assert handler.connector == mock_connector - assert "python" in handler.command - assert "bec_server.bec_server_utils.launch" in handler.command - - -def test_service_handler_start(): - """Test ServiceHandler registers message handler""" - mock_connector = mock.MagicMock() - handler = ServiceHandler(mock_connector) - - handler.start() - - mock_connector.register.assert_called_once() - call_args = mock_connector.register.call_args - assert call_args[1]["cb"] == handler.handle_service_request - assert call_args[1]["parent"] == handler - - -def test_handle_service_request_restart_all(): - """Test handling restart request for all services""" - mock_connector = mock.MagicMock() - handler = ServiceHandler(mock_connector) - - # Create mock message for restarting all services - mock_msg = mock.MagicMock() - mock_msg.value = messages.ServiceRequestMessage(action="restart") - - with mock.patch.object(handler, "on_restart") as mock_on_restart: - ServiceHandler.handle_service_request(mock_msg, handler) - mock_on_restart.assert_called_once_with(service_name=None) - - -def test_handle_service_request_restart_single_service(): - """Test handling restart request for a single service""" - mock_connector = mock.MagicMock() - handler = ServiceHandler(mock_connector) - - # Create mock message for restarting a specific service - mock_msg = mock.MagicMock() - mock_msg.value = messages.ServiceRequestMessage(action="restart", service_name="scan_server") - - with mock.patch.object(handler, "on_restart") as mock_on_restart: - ServiceHandler.handle_service_request(mock_msg, handler) - mock_on_restart.assert_called_once_with(service_name="scan_server") - - -def test_on_restart_all_services(): - """Test restarting all services""" - mock_connector = mock.MagicMock() - handler = ServiceHandler(mock_connector) - - with mock.patch( - "bec_server.scihub.service_handler.service_handler.subprocess.Popen" - ) as mock_popen: - handler.on_restart(service_name=None) - - mock_popen.assert_called_once() - call_args = mock_popen.call_args - command = call_args[0][0] - - assert "restart" in command - assert "--service" not in command - - -def test_on_restart_single_service(): - """Test restarting a single service""" - mock_connector = mock.MagicMock() - handler = ServiceHandler(mock_connector) - - with mock.patch( - "bec_server.scihub.service_handler.service_handler.subprocess.Popen" - ) as mock_popen: - handler.on_restart(service_name="scan_server") - - mock_popen.assert_called_once() - call_args = mock_popen.call_args - command = call_args[0][0] - - assert "restart" in command - assert "--service scan_server" in command - - -def test_on_restart_subprocess_detached(): - """Test that subprocess is launched detached""" - mock_connector = mock.MagicMock() - handler = ServiceHandler(mock_connector) - - with mock.patch( - "bec_server.scihub.service_handler.service_handler.subprocess.Popen" - ) as mock_popen: - handler.on_restart(service_name="device_server") - - call_kwargs = mock_popen.call_args[1] - assert call_kwargs["shell"] is True - assert call_kwargs["stdout"] == mock.ANY - assert call_kwargs["stderr"] == mock.ANY - assert call_kwargs["stdin"] == mock.ANY - assert "preexec_fn" in call_kwargs From e83cabcb62c35a0d14ef67ffc68e47eecf629a92 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Wed, 19 Nov 2025 21:08:19 +0100 Subject: [PATCH 3/3] f - wip --- bec_server/bec_server/bec_server_utils/supervisor_service.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/bec_server/bec_server/bec_server_utils/supervisor_service.py b/bec_server/bec_server/bec_server_utils/supervisor_service.py index bea826c45..eb04cc239 100644 --- a/bec_server/bec_server/bec_server_utils/supervisor_service.py +++ b/bec_server/bec_server/bec_server_utils/supervisor_service.py @@ -80,9 +80,14 @@ def _on_restart(self, service_name: str | None = None): """ if service_name: logger.info(f"Restarting service '{service_name}' through supervisor") + # Note: We use --interface tmux to ensure that we skip systemctl and subprocess interfaces, + # which does not support restarting individual services. command = f"{self.command} restart --service {service_name} --interface tmux" else: logger.info("Restarting all services through supervisor") + # Note: Here, we do not need to specify the interface. If we are in tmux, + # we skip the supervisor service. For systemctl, the supervisor service does + # not need to be kept alive during restart as systemctl is sufficiently isolated. command = f"{self.command} restart --skip-service supervisor" # pylint: disable=subprocess-popen-preexec-fn