Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bec_lib/bec_lib/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,11 +1178,13 @@ class ServiceRequestMessage(BECMessage):

Args:
action (Literal["restart"]): Action to be executed by the service
service_name (str | None): Name of the service to be restarted. If None, all services will be restarted.
metadata (dict, optional): Metadata. Defaults to None.
"""

msg_type: ClassVar[str] = "service_request_message"
action: Literal["restart"]
service_name: str | None = None


class ProcedureRequestMessage(BECMessage):
Expand Down
32 changes: 32 additions & 0 deletions bec_server/bec_server/bec_server_utils/cli/launch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Description: Launch the Supervisor service.
# This script is the entry point for the Supervisor service. It is called either
# by the bec-supervisor entry point or directly from the command line.
import threading

from bec_lib.bec_service import parse_cmdline_args
from bec_lib.logger import bec_logger
from bec_lib.redis_connector import RedisConnector
from bec_server.bec_server_utils.supervisor_service import SupervisorService

logger = bec_logger.logger
bec_logger.level = bec_logger.LOGLEVEL.INFO


def main():
"""
Launch the Supervisor service.
"""
_, _, config = parse_cmdline_args()

supervisor = SupervisorService(config, RedisConnector)

try:
event = threading.Event()
logger.success("Started Supervisor service")
event.wait()
except KeyboardInterrupt:
supervisor.shutdown()


if __name__ == "__main__":
main()
54 changes: 47 additions & 7 deletions bec_server/bec_server/bec_server_utils/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,26 @@ def main():
default=None,
help="Interface to use (tmux, iterm2, systemctl, subprocess)",
)
command.add_parser("stop", help="Stop the BEC server")
start.add_argument(
"--service",
type=str,
default=None,
help="Start a specific service only (e.g., scan_server, device_server)",
)
stop = command.add_parser("stop", help="Stop the BEC server")
stop.add_argument(
"--service",
type=str,
default=None,
help="Stop a specific service only (e.g., scan_server, device_server)",
)
stop.add_argument(
"--skip-service",
type=str,
action="append",
default=None,
help="Skip stopping this service (can be used multiple times)",
)
restart = command.add_parser("restart", help="Restart the BEC server")
restart.add_argument(
"--config", type=str, default=None, help="Path to the BEC service config file"
Expand All @@ -40,6 +59,19 @@ def main():
default=None,
help="Interface to use (tmux, iterm2, systemctl, subprocess)",
)
restart.add_argument(
"--service",
type=str,
default=None,
help="Restart a specific service only (e.g., scan_server, device_server)",
)
restart.add_argument(
"--skip-service",
type=str,
action="append",
default=None,
help="Skip restarting this service (can be used multiple times)",
)
command.add_parser("attach", help="Open the currently running BEC server session")

args = parser.parse_args()
Expand All @@ -59,11 +91,22 @@ def main():
no_persistence=args.no_persistence if "no_persistence" in args else False,
)
if args.command == "start":
service_handler.start()
if hasattr(args, "service") and args.service:
service_handler.start_service(args.service)
else:
service_handler.start()
elif args.command == "stop":
service_handler.stop()
skip_services = getattr(args, "skip_service", None) or []
if hasattr(args, "service") and args.service:
service_handler.stop_service(args.service)
else:
service_handler.stop(skip_services=skip_services)
elif args.command == "restart":
service_handler.restart()
skip_services = getattr(args, "skip_service", None) or []
if hasattr(args, "service") and args.service:
service_handler.restart_service(args.service)
else:
service_handler.restart(skip_services=skip_services)
elif args.command == "attach":
if os.path.exists("/tmp/tmux-shared/default"):
# if we have a shared socket, use it
Expand All @@ -78,7 +121,4 @@ def main():


if __name__ == "__main__":
import sys

sys.argv = ["bec-server", "start"]
main()
168 changes: 157 additions & 11 deletions bec_server/bec_server/bec_server_utils/service_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,30 @@
import sys
import time
from dataclasses import dataclass, field
from enum import Enum
from string import Template
from typing import Callable, Literal, Union

import redis

from bec_lib.service_config import ServiceConfig
from bec_lib.utils.rpc_utils import rgetattr
from bec_server.bec_server_utils.subprocess_launch import subprocess_start, subprocess_stop
from bec_server.bec_server_utils.tmux_launch import tmux_start, tmux_stop
from bec_server.bec_server_utils.tmux_launch import (
tmux_restart_service,
tmux_start,
tmux_start_service,
tmux_stop,
tmux_stop_service,
)


class ServiceOperation(str, Enum):
"""Enum for service operations."""

STARTING = "Starting"
STOPPING = "Stopping"
RESTARTING = "Restarting"


class bcolors:
Expand Down Expand Up @@ -43,14 +59,16 @@ class ServiceDesc:
command: str
tmux_session: TmuxSession = field(default_factory=TmuxSession)
wait_func: Union[Callable, None] = None
separate_window: bool = False

def __eq__(self, other):
if isinstance(other, ServiceDesc):
if other.path.template == self.path.template and self.command == other.command:
if self.tmux_session.name == other.tmux_session.name:
if self.wait_func == other.wait_func:
return True
return False
if not isinstance(other, ServiceDesc):
return False
comp = ["path.template", "command", "tmux_session.name", "wait_func", "separate_window"]
for attr in comp:
if rgetattr(other, attr) != rgetattr(self, attr):
return False
return True


class ServiceHandler:
Expand All @@ -66,6 +84,9 @@ class ServiceHandler:
"file_writer": ServiceDesc(Template("$base_path/file_writer"), "bec-file-writer"),
"scihub": ServiceDesc(Template("$base_path/scihub"), "bec-scihub"),
"data_processing": ServiceDesc(Template("$base_path/data_processing"), "bec-dap"),
"supervisor": ServiceDesc(
Template("$base_path/bec_server_utils"), "bec-supervisor", separate_window=True
),
}

def __init__(
Expand Down Expand Up @@ -186,14 +207,27 @@ def wait_ready(redis_host_port, key=None):
f"Unsupported interface: {self.interface}. Supported interfaces are: tmux, iterm2, systemctl, subprocess"
)

def stop(self, processes=None):
def stop(self, processes=None, skip_services: list[str] | None = None):
"""
Stop the BEC server using the available interface.

Args:
processes: List of processes to stop (for subprocess interface)
skip_services: List of service names to skip stopping
"""
skip_services = skip_services or []
print("Stopping BEC server...")
if skip_services:
print(f"Skipping services: {', '.join(skip_services)}")
if self.interface == "tmux":
tmux_stop("bec-redis")
tmux_stop("bec")
if skip_services:
# Stop individual services except the ones to skip
for service_name in self.SERVICES.keys():
if service_name not in skip_services:
tmux_stop_service(service_name, "bec")
else:
tmux_stop("bec")
elif self.interface == "iterm2":
pass
elif self.interface == "systemctl":
Expand All @@ -205,10 +239,122 @@ def stop(self, processes=None):
f"Unsupported interface: {self.interface}. Supported interfaces are: tmux, iterm2, systemctl, subprocess"
)

def restart(self):
def restart(self, skip_services: list[str] | None = None):
"""
Restart the BEC server using the available interface.

Args:
skip_services: List of service names to skip restarting
"""
print("Restarting BEC server...")
self.stop()
self.stop(skip_services=skip_services)
self.start()

def _validate_service(self, service_name: str) -> tuple[bool, ServiceDesc | None]:
"""
Validate that a service name exists in the available services.

Args:
service_name (str): Name of the service to validate

Returns:
tuple: (is_valid: bool, service_config: ServiceDesc or None)
"""
if service_name not in self.SERVICES:
print(
f"{bcolors.FAIL}Error: Unknown service '{service_name}'. Available services: {', '.join(self.SERVICES.keys())}{bcolors.ENDC}"
)
return False, None

service_config = copy.deepcopy(self.SERVICES[service_name])
if self.config_path:
service_config.command += f" --config {self.config_path}"

return True, service_config

def _handle_service_operation(
self,
service_name: str,
operation: ServiceOperation,
tmux_func: Callable,
require_config: bool = True,
) -> bool:
"""
Handle a service operation (start, stop, restart) with common validation and execution logic.

Args:
service_name (str): Name of the service
operation (ServiceOperation): Operation type (STARTING, STOPPING, RESTARTING)
tmux_func (Callable): Function to call for tmux interface
require_config (bool): Whether the operation requires service config

Returns:
bool: True if operation was successful, False otherwise
"""
is_valid, service_config = self._validate_service(service_name)
if not is_valid:
return False

if self.interface == "tmux":
print(f"{operation.value} {service_name}...")
if require_config:
success = tmux_func(self.bec_path, service_name, service_config)
else:
success = tmux_func(service_name)

if success:
print(
f"{bcolors.OKGREEN}Successfully {operation.value.lower()} {service_name}{bcolors.ENDC}"
)
else:
print(
f"{bcolors.FAIL}Failed to {operation.value.lower()} {service_name}. Service not found in tmux session.{bcolors.ENDC}"
)
return success
else:
print(
f"{bcolors.FAIL}Error: Service operations are only supported with the 'tmux' interface.{bcolors.ENDC}"
)
return False

def restart_service(self, service_name: str) -> bool:
"""
Restart a single service using the available interface.

Args:
service_name (str): Name of the service to restart (e.g., "scan_server", "device_server")

Returns:
bool: True if service was restarted successfully, False otherwise
"""
return self._handle_service_operation(
service_name, ServiceOperation.RESTARTING, tmux_restart_service, require_config=True
)

def start_service(self, service_name: str) -> bool:
"""
Start a single service using the available interface.

Args:
service_name (str): Name of the service to start (e.g., "scan_server", "device_server")

Returns:
bool: True if service was started successfully, False otherwise
"""
return self._handle_service_operation(
service_name, ServiceOperation.STARTING, tmux_start_service, require_config=True
)

def stop_service(self, service_name: str) -> bool:
"""
Stop a single service using the available interface.

Args:
service_name (str): Name of the service to stop (e.g., "scan_server", "device_server")

Returns:
bool: True if service was stopped successfully, False otherwise
"""
return self._handle_service_operation(
service_name, ServiceOperation.STOPPING, tmux_stop_service, require_config=False
)
Loading
Loading