diff --git a/.gitignore b/.gitignore index b9286b30..c5333a39 100644 --- a/.gitignore +++ b/.gitignore @@ -209,3 +209,9 @@ renders/ # pixi environments .pixi/* !.pixi/config.toml + +# RL training artifacts +outputs/ +wandb/ +mappo_*/ +*.gif diff --git a/main.py b/main.py index 2b783589..bfb4c138 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,7 @@ from utama_core.strategy.examples import ( DefenceStrategy, GoToBallExampleStrategy, - PointCycleStrategy, + RandomMovementStrategy, RobotPlacementStrategy, StartupStrategy, TwoRobotPlacementStrategy, @@ -18,7 +18,7 @@ def main(): custom_bounds = FieldBounds(top_left=(2.25, 1.5), bottom_right=(4.5, -1.5)) runner = StrategyRunner( - strategy=PointCycleStrategy(n_robots=2, field_bounds=custom_bounds, endpoint_tolerance=0.1, seed=42), + strategy=RandomMovementStrategy(n_robots=2, field_bounds=custom_bounds, endpoint_tolerance=0.1, seed=42), my_team_is_yellow=True, my_team_is_right=True, mode="rsim", diff --git a/utama_core/config/settings.py b/utama_core/config/settings.py index e2a2bce3..d255f4aa 100644 --- a/utama_core/config/settings.py +++ b/utama_core/config/settings.py @@ -44,3 +44,6 @@ ### Refiners ### BALL_MERGE_THRESHOLD = 0.05 # CameraCombiner: distance threshold to merge balls (m) +VISION_BOUNDS_BUFFER = 1.0 # CameraCombiner: buffer around field bounds to include in vision (m) + +OFF_PITCH_OFFSET = VISION_BOUNDS_BUFFER * 5 # distance outside field bounds to consider as off-pitch (m) diff --git a/utama_core/data_processing/refiners/position.py b/utama_core/data_processing/refiners/position.py index 5851bbf5..5ff1a2d8 100644 --- a/utama_core/data_processing/refiners/position.py +++ b/utama_core/data_processing/refiners/position.py @@ -5,7 +5,7 @@ import numpy as np -from utama_core.config.settings import BALL_MERGE_THRESHOLD +from utama_core.config.settings import BALL_MERGE_THRESHOLD, VISION_BOUNDS_BUFFER from utama_core.data_processing.refiners.base_refiner import BaseRefiner from utama_core.data_processing.refiners.filters.kalman import ( KalmanFilter, @@ -40,19 +40,28 @@ class VisionBounds: class PositionRefiner(BaseRefiner): + """ + Refiner that combines vision data from multiple cameras, applies bounds filtering, + and optionally applies Kalman filtering for smoothing and imputing vanished robots. + + Important: + when exp_ball set to False, the refiner could return either Ball | None type ball value + when exp_ball set to True, the refiner will ALWAYS return a Ball type ball value (will impute missing frames) + """ + def __init__( self, field_bounds: FieldBounds, - bounds_buffer: float = 1.0, filtering: bool = True, + exp_ball: bool = True, ): # alpha=0 means no change in angle (inf smoothing), alpha=1 means no smoothing self.angle_smoother = AngleSmoother(alpha=1) self.vision_bounds = VisionBounds( - x_min=field_bounds.top_left[0] - bounds_buffer, # expand left - x_max=field_bounds.bottom_right[0] + bounds_buffer, # expand right - y_min=field_bounds.bottom_right[1] - bounds_buffer, # expand bottom - y_max=field_bounds.top_left[1] + bounds_buffer, # expand top + x_min=field_bounds.top_left[0] - VISION_BOUNDS_BUFFER, # expand left + x_max=field_bounds.bottom_right[0] + VISION_BOUNDS_BUFFER, # expand right + y_min=field_bounds.bottom_right[1] - VISION_BOUNDS_BUFFER, # expand bottom + y_max=field_bounds.top_left[1] + VISION_BOUNDS_BUFFER, # expand top ) # For Kalman filtering and imputing vanished values. @@ -61,6 +70,8 @@ def __init__( False # Only start filtering once we have valid data to filter (i.e. after the first valid game frame) ) + self.exp_ball = exp_ball + if self.filtering: # Instantiate a dedicated Kalman filter for each robot so filtering can be kept independent. self.kalman_filters_yellow: dict[int, KalmanFilter] = {} @@ -138,18 +149,20 @@ def refine(self, game_frame: GameFrame, data: List[RawVisionData]) -> GameFrame: ) # After the balls have been combined, take the most confident - new_ball: Ball = PositionRefiner._get_most_confident_ball(combined_vision_data.balls) - - # For filtering and vanishing - if self.filtering and self._filter_running: - new_ball = self.kalman_filter_ball.filter_data( - new_ball, - game_frame.ball, - time_elapsed, - ) - elif new_ball is None: - # If none, take the ball from the last frame of the game - new_ball = game_frame.ball + new_ball = PositionRefiner._get_most_confident_ball(combined_vision_data.balls) + + # Skip filtering when there's no ball and we don't expect one + if new_ball is not None or self.exp_ball: + # For filtering and vanishing + if self.filtering and self._filter_running: + new_ball = self.kalman_filter_ball.filter_data( + new_ball, + game_frame.ball, + time_elapsed, + ) + elif new_ball is None: + # If none, take the ball from the last frame of the game + new_ball = game_frame.ball if game_frame.my_team_is_yellow: new_game_frame = replace( diff --git a/utama_core/global_utils/math_utils.py b/utama_core/global_utils/math_utils.py index 926f2b93..f0db48e0 100644 --- a/utama_core/global_utils/math_utils.py +++ b/utama_core/global_utils/math_utils.py @@ -113,6 +113,23 @@ def compute_bounding_zone_from_points( return FieldBounds(top_left=(min_x, max_y), bottom_right=(max_x, min_y)) +def in_field_bounds(point: Tuple[float, float] | Vector2D, bounding_box: FieldBounds) -> bool: + """Check if a point is within a given bounding box. + + Args: + point (tuple or Vector2D): The (x, y) coordinates of the point to check. + bounding_box (FieldBounds): The bounding box defined by its top-left and bottom-right corners. + + Returns: + bool: True if the point is within the bounding box, False otherwise. + """ + x, y = point + return ( + bounding_box.top_left[0] <= x <= bounding_box.bottom_right[0] + and bounding_box.bottom_right[1] <= y <= bounding_box.top_left[1] + ) + + def assert_valid_bounding_box(bb: FieldBounds): """Asserts that a FieldBounds object is valid, raising an AssertionError if not.""" fx, fy = Field._FULL_FIELD_HALF_LENGTH, Field._FULL_FIELD_HALF_WIDTH diff --git a/utama_core/rsoccer_simulator/src/ssl/envs/standard_ssl.py b/utama_core/rsoccer_simulator/src/ssl/envs/standard_ssl.py index 58faf973..1eb026da 100644 --- a/utama_core/rsoccer_simulator/src/ssl/envs/standard_ssl.py +++ b/utama_core/rsoccer_simulator/src/ssl/envs/standard_ssl.py @@ -92,6 +92,7 @@ def __init__( time_step=time_step, render_mode=render_mode, ) + # Note: observation_space and action_space removed - not needed for non-RL use # set starting formation style for @@ -354,7 +355,7 @@ def _dribbler_release_kick( def _calculate_reward_and_done(self): return 1, False - def _get_initial_positions_frame(self): + def _get_initial_positions_frame(self, ball_exists: bool = True) -> Frame: """Returns the position of each robot and ball for the initial frame (random placement)""" pos_frame: Frame = Frame() @@ -366,7 +367,13 @@ def _get_initial_positions_frame(self): x, y, heading = self.yellow_formation[i] pos_frame.robots_yellow[i] = Robot(id=i, x=x, y=-y, theta=-rad_to_deg(heading)) - pos_frame.ball = Ball(x=0, y=0) + if ball_exists: + pos_frame.ball = Ball(x=0, y=0) + else: + pos_frame.ball = Ball( + x=self.OFF_FIELD_OFFSET + self.field.length, + y=self.OFF_FIELD_OFFSET + self.field.width, + ) return pos_frame diff --git a/utama_core/rsoccer_simulator/src/ssl/ssl_gym_base.py b/utama_core/rsoccer_simulator/src/ssl/ssl_gym_base.py index 962ef5d4..6f023518 100644 --- a/utama_core/rsoccer_simulator/src/ssl/ssl_gym_base.py +++ b/utama_core/rsoccer_simulator/src/ssl/ssl_gym_base.py @@ -33,6 +33,7 @@ class SSLBaseEnv: "render.fps": 60, } NORM_BOUNDS = 1.2 + OFF_FIELD_OFFSET = 100.0 # Meters outside of the field to place ball when not needed. def __init__( self, @@ -51,6 +52,7 @@ def __init__( n_robots_yellow=n_robots_yellow, time_step_ms=int(self.time_step * 1000), ) + self.n_robots_blue: int = n_robots_blue self.n_robots_yellow: int = n_robots_yellow @@ -149,9 +151,14 @@ def close(self): ### CUSTOM FUNCTIONS WE ADDED ### - def teleport_ball(self, x: float, y: float, vx: float = 0, vy: float = 0): - """Teleport ball to new position in meters. + def step_noop(self): + """Step the environment without any action.""" + observation = self._frame_to_observations()[0] + self.steps += 1 + return observation + def teleport_ball(self, x: float, y: float, vx: float = 0, vy: float = 0): + """Teleport ball to a new position on the field in meters. Note: this does not create a new frame, but mutates the current frame """ ball = Ball(x=x, y=-y, z=self.frame.ball.z, v_x=vx, v_y=-vy) @@ -161,13 +168,12 @@ def teleport_ball(self, x: float, y: float, vx: float = 0, vy: float = 0): def teleport_robot( self, is_team_yellow: bool, - robot_id: bool, + robot_id: int, x: float, y: float, theta: float = None, ): - """Teleport robot to new position in meters, radians. - + """Teleport robot to a new position on the field in meters, radians. Note: this does not create a new frame, but mutates the current frame """ if theta is None: @@ -312,8 +318,14 @@ def _calculate_reward_and_done(self): """Returns reward value and done flag from type List[Robot] state.""" raise NotImplementedError - def _get_initial_positions_frame(self) -> Frame: - """Returns frame with robots initial positions.""" + def _get_initial_positions_frame(self, ball_exists: bool = True) -> Frame: + """ + Returns frame with robots initial positions. + Args: + ball_exists (bool): Whether the ball should be placed in the center of the field or removed from the field. + Returns: + Frame: frame with initial positions of robots and ball (if ball_exists=True) + """ raise NotImplementedError def norm_pos(self, pos): diff --git a/utama_core/run/game_gater.py b/utama_core/run/game_gater.py index e323dd5f..867156d8 100644 --- a/utama_core/run/game_gater.py +++ b/utama_core/run/game_gater.py @@ -14,6 +14,7 @@ def wait_until_game_valid( my_team_is_right: bool, exp_friendly: int, exp_enemy: int, + exp_ball: bool, vision_buffers: List[Deque[RawVisionData]], position_refiner: PositionRefiner, is_pvp: bool, @@ -30,8 +31,8 @@ def wait_until_game_valid( def _add_frame(my_game_frame: GameFrame, opp_game_frame: GameFrame) -> Tuple[GameFrame, Optional[GameFrame]]: if rsim_env: - vision_frames = [rsim_env._frame_to_observations()[0]] - rsim_env.steps += 1 # Increment the step count to simulate time passing in the environment + obs = rsim_env.step_noop() # Step the environment without action to get the latest observation + vision_frames = [obs] else: vision_frames = [buffer.popleft() if buffer else None for buffer in vision_buffers] my_game_frame = position_refiner.refine(my_game_frame, vision_frames) @@ -54,14 +55,21 @@ def _add_frame(my_game_frame: GameFrame, opp_game_frame: GameFrame) -> Tuple[Gam while ( len(my_game_frame.friendly_robots) < exp_friendly or len(my_game_frame.enemy_robots) < exp_enemy - or my_game_frame.ball is None + or (my_game_frame.ball is None and exp_ball) ): if time.time() - start_time > wait_before_warn: start_time = time.time() print("Waiting for valid game frame...") print(f"Friendly robots: {len(my_game_frame.friendly_robots)}/{exp_friendly}") print(f"Enemy robots: {len(my_game_frame.enemy_robots)}/{exp_enemy}") - print(f"Ball present: {my_game_frame.ball is not None}\n") + print(f"Ball present: {my_game_frame.ball is not None} (exp: {exp_ball})\n") + + # nothing will change in rsim if we don't step it. + # if no valid frame, likely misconfigured. + if rsim_env: + raise TimeoutError( + f"Rsim environment did not produce a valid game frame after {wait_before_warn} seconds. Check the environment setup and vision data." + ) time.sleep(0.05) my_game_frame, opp_game_frame = _add_frame(my_game_frame, opp_game_frame) diff --git a/utama_core/run/strategy_runner.py b/utama_core/run/strategy_runner.py index 510ad77c..f2b1f602 100644 --- a/utama_core/run/strategy_runner.py +++ b/utama_core/run/strategy_runner.py @@ -99,6 +99,9 @@ class StrategyRunner: mode (str): "real", "rsim", "grism" exp_friendly (int): Expected number of friendly robots. exp_enemy (int): Expected number of enemy robots. + exp_ball (bool): Whether the ball is expected to be present. + Only raises error when strategy expects ball but runtime does not provide it. + Defaults to True. field_bounds (FieldBounds): Configuration of the field. Defaults to standard field. opp_strategy (AbstractStrategy, optional): Opponent strategy for pvp. Defaults to None for single player. control_scheme (str, optional): Name of the motion control scheme to use. @@ -122,6 +125,7 @@ def __init__( mode: str, exp_friendly: int, exp_enemy: int, + exp_ball: bool = True, field_bounds: FieldBounds = Field.FULL_FIELD_BOUNDS, opp_strategy: Optional[AbstractStrategy] = None, control_scheme: str = "pid", # This is also the default control scheme used in the motion planning tests @@ -140,6 +144,7 @@ def __init__( self.mode: Mode = self._load_mode(mode) self.exp_friendly = exp_friendly self.exp_enemy = exp_enemy + self.exp_ball = exp_ball self.field_bounds = field_bounds self.vision_buffers, self.ref_buffer = self._setup_vision_and_referee() @@ -153,10 +158,17 @@ def __init__( ### functions below rely on self.my and self.opp ### self.rsim_env, self.sim_controller = self._load_sim(rsim_noise, rsim_vanishing) - self._assert_exp_robots(exp_friendly, exp_enemy) + self._assert_exp_robots_and_ball(exp_friendly, exp_enemy, exp_ball) self._load_robot_controllers() + # Remove Rsim ball. Rsim does not have the flexibilty to start without a ball. + # this must also be done after robot controllers are loaded and env reset + # the reset are embedded in the robot controllers so that the env can be controlled + # with the RsimRobotController even outside the context of StrategyRunner. + if self.rsim_env and not self.exp_ball: + self._remove_rsim_ball() + self._load_game() self._assert_exp_goals() @@ -187,6 +199,7 @@ def __init__( def _handle_sigint(self, sig, frame): self._stop_event.set() + signal.default_int_handler(sig, frame) def _load_mode(self, mode_str: str) -> Mode: """Convert a mode string to a Mode enum value. @@ -262,7 +275,9 @@ def _setup_sides_data( Tuple containing the SideRuntime for the friendly team and the opponent team (or None if no opponent strategy provided). """ opp_side = None - my_pos_ref, my_vel_ref, my_robot_ref = self._init_refiners(self.field_bounds, filtering=filtering) + my_pos_ref, my_vel_ref, my_robot_ref = self._init_refiners( + self.field_bounds, filtering=filtering, exp_ball=self.exp_ball + ) my_motion_controller = get_control_scheme(control_scheme) my_strategy.setup_behaviour_tree(is_opp_strat=False) my_side = SideRuntime( @@ -274,7 +289,9 @@ def _setup_sides_data( ) if opp_strategy is not None: - opp_pos_ref, opp_vel_ref, opp_robot_ref = self._init_refiners(self.field_bounds, filtering=filtering) + opp_pos_ref, opp_vel_ref, opp_robot_ref = self._init_refiners( + self.field_bounds, filtering=filtering, exp_ball=self.exp_ball + ) opp_motion_controller = ( get_control_scheme(opp_control_scheme) if opp_control_scheme is not None else my_motion_controller ) @@ -289,6 +306,11 @@ def _setup_sides_data( return my_side, opp_side + def _remove_rsim_ball(self): + """Removes the ball from the RSim environment by teleporting it off-field.""" + self.sim_controller.remove_ball() + self.rsim_env.step_noop() # Step the environment to apply the change + def _load_sim( self, rsim_noise: RsimGaussianNoise, @@ -323,11 +345,11 @@ def _load_sim( self.opp.strategy.load_rsim_env(rsim_env) self.my.strategy.load_rsim_env(rsim_env) - return rsim_env, RSimController(env=rsim_env) + return rsim_env, RSimController(field_bounds=self.field_bounds, exp_ball=self.exp_ball, env=rsim_env) elif self.mode == Mode.GRSIM: # can consider baking all of these directly into sim controller - sim_controller = GRSimController() + sim_controller = GRSimController(self.field_bounds, self.exp_ball) n_yellow, n_blue = map_friendly_enemy_to_colors(self.my_team_is_yellow, self.exp_friendly, self.exp_enemy) # Ensure the expected number of robots is met by teleporting them @@ -354,7 +376,11 @@ def _load_sim( sim_controller.set_robot_presence(b, False, True) b_start = blue_start[b] sim_controller.teleport_robot(False, b, b_start[0], b_start[1], b_start[2]) - sim_controller.teleport_ball(0, 0) + + if self.exp_ball: + sim_controller.teleport_ball(0, 0) + else: + sim_controller.remove_ball() return None, sim_controller @@ -376,24 +402,58 @@ def _setup_vision_and_referee(self) -> Tuple[deque, deque]: return vision_buffers, ref_buffer - def _assert_exp_robots( + def _assert_exp_robots_and_ball( self, exp_friendly: int, exp_enemy: int, - ): - """Assert the expected number of robots.""" - assert exp_friendly <= MAX_ROBOTS, "Expected number of friendly robots is too high." - assert exp_enemy <= MAX_ROBOTS, "Expected number of enemy robots is too high." - assert exp_friendly >= 1, "Expected number of friendly robots is too low." - assert exp_enemy >= 0, "Expected number of enemy robots is too low." - - assert self.my.strategy.assert_exp_robots( - exp_friendly, exp_enemy - ), "Expected number of robots at runtime does not match my strategy." + exp_ball: bool, + ) -> None: + """ + Validate that expected robot counts and ball presence are consistent + with both team strategies at runtime. + + This method performs runtime configuration checks to ensure that the + expected number of friendly and opponent robots, as well as ball + availability, match the assumptions declared by each strategy. + + Parameters + ---------- + exp_friendly : int + Expected number of friendly robots. + exp_enemy : int + Expected number of opponent robots. + exp_ball : bool + Whether a ball is expected to be present. + + Raises + ------ + ValueError + If robot counts fall outside valid bounds. + RuntimeError + If strategy expectations do not match runtime configuration. + """ + + if exp_friendly > MAX_ROBOTS: + raise ValueError(f"Expected number of friendly robots ({exp_friendly}) exceeds MAX_ROBOTS ({MAX_ROBOTS}).") + if exp_enemy > MAX_ROBOTS: + raise ValueError(f"Expected number of enemy robots ({exp_enemy}) exceeds MAX_ROBOTS ({MAX_ROBOTS}).") + if exp_friendly < 1: + raise ValueError(f"Expected number of friendly robots ({exp_friendly}) must be at least 1.") + if exp_enemy < 0: + raise ValueError(f"Expected number of enemy robots ({exp_enemy}) cannot be negative.") + + if not self.my.strategy.assert_exp_robots(exp_friendly, exp_enemy): + raise RuntimeError("Runtime robot count does not match expectations of my strategy.") + + if not exp_ball and self.my.strategy.exp_ball: + raise RuntimeError("Ball expected by my strategy, but not available in runtime configuration.") + if self.opp: - assert self.opp.strategy.assert_exp_robots( - exp_enemy, exp_friendly - ), "Expected number of robots at runtime does not match opponent strategy." + if not self.opp.strategy.assert_exp_robots(exp_enemy, exp_friendly): + raise RuntimeError("Runtime robot count does not match expectations of opponent strategy.") + + if not exp_ball and self.opp.strategy.exp_ball: + raise RuntimeError("Ball expected by opponent strategy, but not available in runtime configuration.") def _assert_exp_goals(self): """Assert the expected number of goals.""" @@ -466,18 +526,22 @@ def _init_refiners( self, field_bounds: FieldBounds, filtering: bool, + exp_ball: bool = True, ) -> tuple[PositionRefiner, VelocityRefiner, RobotInfoRefiner]: """ Initialize the position, velocity, and robot info refiners. Args: field_bounds (FieldBounds): The bounds of the field. filtering (bool): Whether to use filtering in the position refiner. + exp_ball (bool): Whether the ball is expected. When False, the position refiner is + allowed to return None if no ball is detected in raw vision data. Returns: tuple: The initialized PositionRefiner, VelocityRefiner, and RobotInfoRefiner. """ position_refiner = PositionRefiner( field_bounds, filtering=filtering, + exp_ball=exp_ball, ) velocity_refiner = VelocityRefiner() robot_info_refiner = RobotInfoRefiner() @@ -495,6 +559,7 @@ def _load_game(self): self.my_team_is_right, self.exp_friendly, self.exp_enemy, + self.exp_ball, self.vision_buffers, self.my.position_refiner, is_pvp=self.opp is not None, diff --git a/utama_core/strategy/common/abstract_strategy.py b/utama_core/strategy/common/abstract_strategy.py index 9a289b7d..5a1a4dd7 100644 --- a/utama_core/strategy/common/abstract_strategy.py +++ b/utama_core/strategy/common/abstract_strategy.py @@ -60,6 +60,13 @@ class AbstractStrategy(ABC): Base class for team strategies backed by behaviour trees. """ + ### To be specified in your abstract strategy ### + # whether the strategy expects the ball to be present on the field + # if True, and strat_runner exp_ball is False, strat_runner will raise an error and not run the strategy + # if False, but ball exists, we will just rock on! + exp_ball: bool = True + ################################################# + def __init__(self): self.behaviour_tree = py_trees.trees.BehaviourTree(self.create_behaviour_tree()) diff --git a/utama_core/strategy/examples/__init__.py b/utama_core/strategy/examples/__init__.py index dffe12f6..6d2bd63d 100644 --- a/utama_core/strategy/examples/__init__.py +++ b/utama_core/strategy/examples/__init__.py @@ -1,8 +1,19 @@ from utama_core.strategy.examples.defense_strategy import DefenceStrategy from utama_core.strategy.examples.go_to_ball_ex import GoToBallExampleStrategy +from utama_core.strategy.examples.motion_planning.multi_robot_navigation_strategy import ( + MultiRobotNavigationStrategy, +) +from utama_core.strategy.examples.motion_planning.oscillating_obstacle_strategy import ( + OscillatingObstacleStrategy, +) +from utama_core.strategy.examples.motion_planning.random_movement_strategy import ( + RandomMovementStrategy, +) +from utama_core.strategy.examples.motion_planning.simple_navigation_strategy import ( + SimpleNavigationStrategy, +) from utama_core.strategy.examples.one_robot_placement_strategy import ( RobotPlacementStrategy, ) from utama_core.strategy.examples.startup_strategy import StartupStrategy from utama_core.strategy.examples.two_robot_placement import TwoRobotPlacementStrategy -from utama_core.strategy.examples.point_cycle_strategy import PointCycleStrategy diff --git a/utama_core/strategy/examples/motion_planning/__init__.py b/utama_core/strategy/examples/motion_planning/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/utama_core/tests/motion_planning/strategies/multi_robot_navigation_strategy.py b/utama_core/strategy/examples/motion_planning/multi_robot_navigation_strategy.py similarity index 98% rename from utama_core/tests/motion_planning/strategies/multi_robot_navigation_strategy.py rename to utama_core/strategy/examples/motion_planning/multi_robot_navigation_strategy.py index ac822a1b..0e2a1bc5 100644 --- a/utama_core/tests/motion_planning/strategies/multi_robot_navigation_strategy.py +++ b/utama_core/strategy/examples/motion_planning/multi_robot_navigation_strategy.py @@ -19,6 +19,8 @@ class MultiRobotNavigationStrategy(AbstractStrategy): target_orientation: Optional target orientation for all robots (default: 0) """ + exp_ball = False # No ball interaction expected + def __init__( self, robot_targets: Dict[int, tuple[float, float]], diff --git a/utama_core/tests/motion_planning/strategies/oscillating_obstacle_strategy.py b/utama_core/strategy/examples/motion_planning/oscillating_obstacle_strategy.py similarity index 99% rename from utama_core/tests/motion_planning/strategies/oscillating_obstacle_strategy.py rename to utama_core/strategy/examples/motion_planning/oscillating_obstacle_strategy.py index 6a53c3bf..b02afea8 100644 --- a/utama_core/tests/motion_planning/strategies/oscillating_obstacle_strategy.py +++ b/utama_core/strategy/examples/motion_planning/oscillating_obstacle_strategy.py @@ -134,6 +134,8 @@ class OscillatingObstacleStrategy(AbstractStrategy): obstacle_configs: List of MovingObstacleConfig objects defining each obstacle's behavior """ + exp_ball = False # This strategy does not require the ball + def __init__(self, obstacle_configs: List["MovingObstacleConfig"]): self.obstacle_configs = obstacle_configs super().__init__() diff --git a/utama_core/strategy/examples/motion_planning/random_movement_strategy.py b/utama_core/strategy/examples/motion_planning/random_movement_strategy.py new file mode 100644 index 00000000..0194a088 --- /dev/null +++ b/utama_core/strategy/examples/motion_planning/random_movement_strategy.py @@ -0,0 +1,179 @@ +"""Strategy for random movement within bounded area.""" + +from __future__ import annotations + +import random +from typing import Callable, Optional + +import py_trees + +from utama_core.entities.data.vector import Vector2D +from utama_core.entities.game.field import FieldBounds +from utama_core.skills.src.utils.move_utils import move +from utama_core.strategy.common.abstract_behaviour import AbstractBehaviour +from utama_core.strategy.common.abstract_strategy import AbstractStrategy + + +class RandomMovementBehaviour(AbstractBehaviour): + """ + Behaviour that makes a robot move to random targets within bounds. + + Args: + robot_id: The robot ID to control. + field_bounds: Movement bounds. + endpoint_tolerance: Distance to consider target reached. + min_target_distance: Minimum distance from current position when selecting next target. + seed: Random seed for deterministic behaviour. + on_target_reached: Optional callback invoked with robot_id when a target is reached. + """ + + def __init__( + self, + robot_id: int, + field_bounds: FieldBounds, + endpoint_tolerance: float, + min_target_distance: float = 0.0, + seed: int = 42, + on_target_reached: Callable[[int], None] | None = None, + ): + super().__init__(name=f"RandomMovement_{robot_id}") + self.robot_id = robot_id + self.field_bounds = field_bounds + self.endpoint_tolerance = endpoint_tolerance + self.min_target_distance = min_target_distance + self.on_target_reached = on_target_reached + + self.current_target: Optional[Vector2D] = None + self.rng = random.Random(seed) + + # Pre-compute bounds for sampling + self._min_x = min(field_bounds.top_left[0], field_bounds.bottom_right[0]) + self._max_x = max(field_bounds.top_left[0], field_bounds.bottom_right[0]) + self._min_y = min(field_bounds.top_left[1], field_bounds.bottom_right[1]) + self._max_y = max(field_bounds.top_left[1], field_bounds.bottom_right[1]) + + def _generate_random_target(self, current_pos: Vector2D) -> Vector2D: + """Generate a random target within bounds, at least min_target_distance from current pos.""" + MAX_ATTEMPTS = 50 + PADDING = 0.3 + + for _ in range(MAX_ATTEMPTS): + x = self.rng.uniform(self._min_x + PADDING, self._max_x - PADDING) + y = self.rng.uniform(self._min_y + PADDING, self._max_y - PADDING) + target = Vector2D(x, y) + + if current_pos.distance_to(target) >= self.min_target_distance: + return target + + # Fallback: return a random position even if too close + x = self.rng.uniform(self._min_x + PADDING, self._max_x - PADDING) + y = self.rng.uniform(self._min_y + PADDING, self._max_y - PADDING) + return Vector2D(x, y) + + def initialise(self): + self.current_target = None + + def update(self) -> py_trees.common.Status: + """Command robot to move to random targets.""" + game = self.blackboard.game + rsim_env = self.blackboard.rsim_env + + if not game.friendly_robots or self.robot_id not in game.friendly_robots: + return py_trees.common.Status.RUNNING + + robot = game.friendly_robots[self.robot_id] + robot_pos = Vector2D(robot.p.x, robot.p.y) + + if self.current_target is None: + self.current_target = self._generate_random_target(robot_pos) + + if robot_pos.distance_to(self.current_target) <= self.endpoint_tolerance: + if self.on_target_reached is not None: + self.on_target_reached(self.robot_id) + self.current_target = self._generate_random_target(robot_pos) + + if rsim_env: + rsim_env.draw_point(self.current_target.x, self.current_target.y, color="green") + rsim_env.draw_line( + [ + (robot_pos.x, robot_pos.y), + (self.current_target.x, self.current_target.y), + ], + color="blue", + width=1, + ) + + cmd = move( + game, + self.blackboard.motion_controller, + self.robot_id, + self.current_target, + 0.0, + ) + + self.blackboard.cmd_map[self.robot_id] = cmd + return py_trees.common.Status.RUNNING + + +class RandomMovementStrategy(AbstractStrategy): + """ + Strategy that controls multiple robots moving randomly within bounds. + + Args: + n_robots: Number of robots to control. + field_bounds: Movement bounds. + endpoint_tolerance: Distance to consider target reached. + min_target_distance: Minimum distance from current position when selecting next target. + seed: Base seed for deterministic behaviour. Each robot gets ``seed + robot_id``. + on_target_reached: Optional callback invoked with robot_id when a target is reached. + """ + + exp_ball = False + + def __init__( + self, + n_robots: int, + field_bounds: FieldBounds, + endpoint_tolerance: float, + min_target_distance: float = 0.0, + seed: int = 42, + on_target_reached: Callable[[int], None] | None = None, + ): + self.n_robots = n_robots + self.field_bounds = field_bounds + self.endpoint_tolerance = endpoint_tolerance + self.min_target_distance = min_target_distance + self.seed = seed + self.on_target_reached = on_target_reached + super().__init__() + + def assert_exp_robots(self, n_runtime_friendly: int, n_runtime_enemy: int): + return n_runtime_friendly >= self.n_robots + + def assert_exp_goals(self, includes_my_goal_line: bool, includes_opp_goal_line: bool): + return True + + def get_min_bounding_zone(self) -> Optional[FieldBounds]: + return self.field_bounds + + def create_behaviour_tree(self) -> py_trees.behaviour.Behaviour: + """Create parallel behaviour tree with one RandomMovementBehaviour per robot.""" + + def make_behaviour(robot_id: int) -> RandomMovementBehaviour: + return RandomMovementBehaviour( + robot_id=robot_id, + field_bounds=self.field_bounds, + endpoint_tolerance=self.endpoint_tolerance, + min_target_distance=self.min_target_distance, + seed=self.seed + robot_id, + on_target_reached=self.on_target_reached, + ) + + if self.n_robots == 1: + return make_behaviour(0) + + return py_trees.composites.Parallel( + name="RandomMovement", + policy=py_trees.common.ParallelPolicy.SuccessOnAll(), + children=[make_behaviour(robot_id) for robot_id in range(self.n_robots)], + ) diff --git a/utama_core/tests/motion_planning/strategies/simple_navigation_strategy.py b/utama_core/strategy/examples/motion_planning/simple_navigation_strategy.py similarity index 98% rename from utama_core/tests/motion_planning/strategies/simple_navigation_strategy.py rename to utama_core/strategy/examples/motion_planning/simple_navigation_strategy.py index 15e33eee..4ebd161d 100644 --- a/utama_core/tests/motion_planning/strategies/simple_navigation_strategy.py +++ b/utama_core/strategy/examples/motion_planning/simple_navigation_strategy.py @@ -74,6 +74,8 @@ class SimpleNavigationStrategy(AbstractStrategy): target_orientation: Optional target orientation in radians (default: 0) """ + exp_ball = False # This strategy does not require the ball + def __init__( self, robot_id: int, diff --git a/utama_core/strategy/examples/point_cycle_strategy.py b/utama_core/strategy/examples/point_cycle_strategy.py deleted file mode 100644 index 0b725616..00000000 --- a/utama_core/strategy/examples/point_cycle_strategy.py +++ /dev/null @@ -1,182 +0,0 @@ -"""Strategy for random movement within bounded area.""" - -from __future__ import annotations - -import random -from collections import deque -from typing import Optional, Tuple - -import py_trees - -from utama_core.entities.data.vector import Vector2D -from utama_core.entities.game.field import FieldBounds -from utama_core.skills.src.utils.move_utils import move -from utama_core.strategy.common.abstract_behaviour import AbstractBehaviour -from utama_core.strategy.common.abstract_strategy import AbstractStrategy - - -class PointCycleBehaviour(AbstractBehaviour): - """ - Behaviour that makes a robot move to randomly sampled targets within bounds. - - Args: - robot_id (int): The robot ID to control. - field_bounds (FieldBounds): ((min_x, max_x), (min_y, max_y)) bounds for movement. - endpoint_tolerance (float): Distance to consider target reached. - seed (Optional[int]): Seed for deterministic random sampling. - """ - - def __init__( - self, - robot_id: int, - field_bounds: FieldBounds, - endpoint_tolerance: float, - seed: Optional[int] = None, - ): - super().__init__(name=f"RandomPoint_{robot_id}") - self.robot_id = robot_id - self.field_bounds = field_bounds - self.endpoint_tolerance = endpoint_tolerance - - self.current_target = None - self.points = RandomPointSampler(field_bounds, seed=seed) - - def initialise(self): - """Initialize with a random target and speed.""" - # Will set target on first update when we have robot position - pass - - def update(self) -> py_trees.common.Status: - """Command robot to move to random targets.""" - game = self.blackboard.game - rsim_env = self.blackboard.rsim_env - - if not game.friendly_robots or self.robot_id not in game.friendly_robots: - return py_trees.common.Status.RUNNING - - robot = game.friendly_robots[self.robot_id] - robot_pos = Vector2D(robot.p.x, robot.p.y) - - # Generate initial target if needed - if self.current_target is None: - self.current_target = self.points.next_point - - # Check if target reached - distance_to_target = robot_pos.distance_to(self.current_target) - if distance_to_target <= self.endpoint_tolerance: - # Generate new target and speed - self.current_target = self.points.next_point - - # Visualize target - if rsim_env: - rsim_env.draw_point(self.current_target.x, self.current_target.y, color="green") - # Draw a line to show path - rsim_env.draw_line( - [ - (robot_pos.x, robot_pos.y), - (self.current_target.x, self.current_target.y), - ], - color="blue", - width=1, - ) - - # Generate movement command - cmd = move( - game, - self.blackboard.motion_controller, - self.robot_id, - self.current_target, - 0.0, # Face forward - ) - - self.blackboard.cmd_map[self.robot_id] = cmd - return py_trees.common.Status.RUNNING - - -class PointCycleStrategy(AbstractStrategy): - """ - Strategy that instantiates one PointCycleBehaviour per friendly robot - and executes them in parallel within specified field bounds. - - Args: - n_robots (int): Number of robots to control. - field_bounds (FieldBounds): Movement bounds. - endpoint_tolerance (float): Distance to consider target reached. - seed (Optional[int]): Base seed for deterministic behaviour. - """ - - def __init__( - self, - n_robots: int, - field_bounds: FieldBounds, - endpoint_tolerance: float, - seed: Optional[int] = None, - ): - self.n_robots = n_robots - self.field_bounds = field_bounds - self.endpoint_tolerance = endpoint_tolerance - self.seed = seed - super().__init__() - - def assert_exp_robots(self, n_runtime_friendly: int, n_runtime_enemy: int): - """Requires number of friendly robots to match.""" - return n_runtime_friendly >= self.n_robots - - def assert_exp_goals(self, includes_my_goal_line: bool, includes_opp_goal_line: bool): - return True - - def get_min_bounding_zone(self) -> Optional[FieldBounds]: - """Return the movement bounds.""" - return self.field_bounds - - def create_behaviour_tree(self) -> py_trees.behaviour.Behaviour: - """Create parallel behaviour tree with all robot random movement behaviours.""" - if self.n_robots == 1: - return PointCycleBehaviour( - robot_id=0, - field_bounds=self.field_bounds, - endpoint_tolerance=self.endpoint_tolerance, - seed=self.seed, - ) - - behaviours = [] - for robot_id in range(self.n_robots): - behaviour = PointCycleBehaviour( - robot_id=robot_id, - field_bounds=self.field_bounds, - endpoint_tolerance=self.endpoint_tolerance, - seed=None if self.seed is None else self.seed + robot_id, - ) - behaviours.append(behaviour) - - return py_trees.composites.Parallel( - name="RandomPoint", - policy=py_trees.common.ParallelPolicy.SuccessOnAll(), - children=behaviours, - ) - - -class RandomPointSampler: - """ - Uniform random point sampler within rectangular field bounds. - - Args: - field_bounds (FieldBounds): ((min_x, max_x), (min_y, max_y)) - seed (Optional[int]): Random seed for deterministic sampling. - """ - - def __init__(self, field_bounds: FieldBounds, seed: int = 42): - self.field_bounds = field_bounds - self.rng = random.Random(seed) - - @property - def next_point(self) -> Vector2D: - min_x = self.field_bounds.bottom_right[0] - max_x = self.field_bounds.top_left[0] - min_y = self.field_bounds.bottom_right[1] - max_y = self.field_bounds.top_left[1] - - x = self.rng.uniform(min_x, max_x) - y = self.rng.uniform(min_y, max_y) - - return Vector2D(x=x, y=y) diff --git a/utama_core/team_controller/src/controllers/common/sim_controller_abstract.py b/utama_core/team_controller/src/controllers/common/sim_controller_abstract.py index 1efa5de3..2e00e832 100644 --- a/utama_core/team_controller/src/controllers/common/sim_controller_abstract.py +++ b/utama_core/team_controller/src/controllers/common/sim_controller_abstract.py @@ -1,22 +1,54 @@ import abc +from utama_core.config.settings import OFF_PITCH_OFFSET +from utama_core.entities.game.field import FieldBounds +from utama_core.global_utils.math_utils import in_field_bounds + class AbstractSimController: """Template for generic sim controller, allowing actions such.""" - @abc.abstractmethod - def teleport_ball(self, x: float, y: float) -> None: + def __init__(self, field_bounds: FieldBounds, exp_ball: bool = True): + self.field_bounds = field_bounds + self.exp_ball = exp_ball + + def remove_ball(self) -> None: + """ + Removes the ball from play by teleporting it outside the field + offset by OFF_PITCH_OFFSET to the bottom right of the field. + """ + + bottom_right = self.field_bounds.bottom_right + + x = bottom_right[0] + OFF_PITCH_OFFSET + y = bottom_right[1] + OFF_PITCH_OFFSET + + self._do_teleport_ball_unrestricted(x, y, 0, 0) + + def teleport_ball(self, x: float, y: float, vx: float = 0, vy: float = 0) -> None: """Teleports the ball to a specific location on the field. Args: - x (float): The x-coordinate to place the ball at (in meters [-4.5, 4.5]). - y (float): The y-coordinate to place the ball at (in meters [-3.0, 3.0]). + x (float): The x-coordinate to place the ball in meters within FieldBounds. + y (float): The y-coordinate to place the ball in meters within FieldBounds. + vx (float): The velocity of the ball in the x-direction (in meters per second). + vy (float): The velocity of the ball in the y-direction (in meters per second). + + Method does not allow for teleporting the ball outside of the field boundaries. This method creates a command for teleporting the ball and sends it to the simulator. """ - ... + if not self.exp_ball: + raise ValueError( + "This controller is configured to not expect a ball, so teleporting the ball is not allowed." + ) + in_field = in_field_bounds((x, y), self.field_bounds) + if not in_field: + raise ValueError( + f"Cannot teleport ball to ({x}, {y}) as it is outside of the field boundaries defined by {self.field_bounds}." + ) + self._do_teleport_ball_unrestricted(x, y, vx, vy) - @abc.abstractmethod def teleport_robot( self, is_team_yellow: bool, @@ -30,23 +62,51 @@ def teleport_robot( Args: is_team_yellow (bool): if the robot is team yellow, else blue robot_id (int): robot id - x (float): The x-coordinate to place the ball at (in meters [-4.5, 4.5]). - y (float): The y-coordinate to place the ball at (in meters [-3.0, 3.0]). + x (float): The x-coordinate to place the robot in meters within FieldBounds. + y (float): The y-coordinate to place the robot in meters within FieldBounds. theta (float): radian angle of the robot heading, 0 degrees faces towards positive x axis - This method creates a command for teleporting the ball and sends it to the simulator. + Method does not allow for teleporting the robot outside of the field boundaries. + + This method creates a command for teleporting the robot and sends it to the simulator. """ - ... + in_field = in_field_bounds((x, y), self.field_bounds) + if not in_field: + raise ValueError( + f"Cannot teleport robot to ({x}, {y}) as it is outside of the field boundaries defined by {self.field_bounds}." + ) + self._do_teleport_robot_unrestricted(is_team_yellow, robot_id, x, y, theta) + + ### Below methods are implemented in the specific sim controllers #### @abc.abstractmethod - def set_robot_presence(self, is_team_yellow: bool, robot_id: int, should_robot_be_present: bool) -> None: + def set_robot_presence(self, robot_id: int, is_team_yellow: bool, should_robot_be_present: bool) -> None: """Sets a robot's presence on the field by teleporting it to a specific location or removing it from the field. Args: robot_id (int): The unique ID of the robot. - team_colour_is_blue (bool): Whether the robot belongs to the blue team. If False, it's assumed to be yellow. + is_team_yellow (bool): Whether the robot belongs to the yellow team. If False, it's assumed to be blue. should_robot_be_present (bool): If True, the robot will be placed on the field; if False, it will be removed. The method calculates a teleport location based on the team and presence status, then sends a command to the simulator. """ ... + + @abc.abstractmethod + def _do_teleport_ball_unrestricted(self, x: float, y: float, vx: float, vy: float) -> None: + """Teleports the ball to a specific location on the field without any boundary checks.""" + ... + + @abc.abstractmethod + def _do_teleport_robot_unrestricted( + self, + is_team_yellow: bool, + robot_id: int, + x: float, + y: float, + theta: float = None, + ) -> None: + """Teleports a robot to a specific location on the field without any boundary checks.""" + ... + + ### End of abstract methods ### diff --git a/utama_core/team_controller/src/controllers/sim/grsim_controller.py b/utama_core/team_controller/src/controllers/sim/grsim_controller.py index 0654a0e3..f8b0ad7f 100644 --- a/utama_core/team_controller/src/controllers/sim/grsim_controller.py +++ b/utama_core/team_controller/src/controllers/sim/grsim_controller.py @@ -9,6 +9,7 @@ SIM_CONTROL_PORT, TELEPORT_X_COORDS, ) +from utama_core.entities.game.field import FieldBounds from utama_core.team_controller.src.controllers.common.sim_controller_abstract import ( AbstractSimController, ) @@ -34,7 +35,14 @@ class GRSimController(AbstractSimController): port (int): Port of the simulator. Defaults to SIM_CONTROL_PORT. """ - def __init__(self, ip: str = LOCAL_HOST, port: int = SIM_CONTROL_PORT): + def __init__( + self, + field_bounds: FieldBounds, + exp_ball: bool = True, + ip: str = LOCAL_HOST, + port: int = SIM_CONTROL_PORT, + ): + super().__init__(field_bounds, exp_ball) self.net = network_manager.NetworkManager(address=(ip, port)) def _create_simulator_command(self, control_message: object) -> object: @@ -42,7 +50,7 @@ def _create_simulator_command(self, control_message: object) -> object: sim_command.control.CopyFrom(control_message) return sim_command - def teleport_ball(self, x: float, y: float, vx: float = 0, vy: float = 0) -> None: + def _do_teleport_ball_unrestricted(self, x: float, y: float, vx: float = 0, vy: float = 0) -> None: """Teleports the ball to a specific location on the field. Args: @@ -69,9 +77,12 @@ def reset(self): self.teleport_robot(True, idx, x[0], x[1], x[2]) for idx, x in enumerate(formations.LEFT_START_ONE): self.teleport_robot(False, idx, x[0], x[1], x[2]) - self.teleport_ball(0, 0, 0, 0) + if self.exp_ball: + self.teleport_ball(0, 0, 0, 0) + else: + self.remove_ball() - def teleport_robot( + def _do_teleport_robot_unrestricted( self, is_team_yellow: bool, robot_id: int, @@ -94,18 +105,18 @@ def teleport_robot( sim_command = self._create_simulator_command(sim_control) self.net.send_command(sim_command) - def set_robot_presence(self, robot_id: int, is_team_yellow: bool, is_present: bool) -> None: + def set_robot_presence(self, robot_id: int, is_team_yellow: bool, should_robot_be_present: bool) -> None: """Sets a robot's presence on the field by teleporting it on and off the field. Args: robot_id (int): The unique ID of the robot. - team_colour_is_blue (bool): Whether the robot belongs to the blue team. If False, it's assumed to be yellow. - is_present (bool): If True, the robot will be placed on the field; if False, it will be despawned. + is_team_yellow (bool): Whether the robot belongs to the yellow team. If False, it's assumed to be blue. + should_robot_be_present (bool): If True, the robot will be placed on the field; if False, it will be despawned. The method calculates a teleport location based on the team and presence status, then sends a command to the simulator. """ - x, y = self._get_teleport_location(robot_id, is_team_yellow, is_present) - sim_control = self._create_teleport_robot_command(robot_id, is_team_yellow, x, y, is_present) + x, y = self._get_teleport_location(robot_id, is_team_yellow, should_robot_be_present) + sim_control = self._create_teleport_robot_command(robot_id, is_team_yellow, x, y, should_robot_be_present) sim_command = self._create_simulator_command(sim_control) self.net.send_command(sim_command) diff --git a/utama_core/team_controller/src/controllers/sim/rsim_controller.py b/utama_core/team_controller/src/controllers/sim/rsim_controller.py index 50ef1cd6..e6ab263f 100644 --- a/utama_core/team_controller/src/controllers/sim/rsim_controller.py +++ b/utama_core/team_controller/src/controllers/sim/rsim_controller.py @@ -1,6 +1,7 @@ from typing import Tuple from utama_core.config.settings import ADD_Y_COORD, REMOVAL_Y_COORD, TELEPORT_X_COORDS +from utama_core.entities.game.field import FieldBounds from utama_core.rsoccer_simulator.src.ssl.ssl_gym_base import SSLBaseEnv from utama_core.team_controller.src.controllers.common.sim_controller_abstract import ( AbstractSimController, @@ -11,39 +12,14 @@ class RSimController(AbstractSimController): """A controller for interacting with a simulation environment for robot soccer, allowing actions such as teleporting the ball and setting robot presence on the field.""" - def __init__(self, env: SSLBaseEnv): + def __init__(self, field_bounds: FieldBounds, exp_ball: bool, env: SSLBaseEnv): + super().__init__(field_bounds, exp_ball) self._env = env - def teleport_ball(self, x: float, y: float, vx: float = 0, vy: float = 0) -> None: - """Teleports the ball to a specific location on the field. - - Args: - x (float): The x-coordinate to place the ball at (in meters [-4.5, 4.5]). - y (float): The y-coordinate to place the ball at (in meters [-3.0, 3.0]). - - This method creates a command for teleporting the ball and sends it to the simulator. - """ + def _do_teleport_ball_unrestricted(self, x, y, vx, vy): self._env.teleport_ball(x, y, vx, vy) - def teleport_robot( - self, - is_team_yellow: bool, - robot_id: int, - x: float, - y: float, - theta: float = None, - ) -> None: - """Teleports a robot to a specific location on the field. - - Args: - is_team_yellow (bool): if the robot is team yellow, else blue - robot_id (int): robot id - x (float): The x-coordinate to place the ball at (in meters [-4.5, 4.5]). - y (float): The y-coordinate to place the ball at (in meters [-3.0, 3.0]). - theta (float): radian angle of the robot heading, 0 degrees faces towards positive x axis - - This method creates a command for teleporting the ball and sends it to the simulator. - """ + def _do_teleport_robot_unrestricted(self, is_team_yellow, robot_id, x, y, theta=None): self._env.teleport_robot(is_team_yellow, robot_id, x, y, theta) def set_robot_presence(self, robot_id: int, is_team_yellow: bool, should_robot_be_present: bool) -> None: @@ -51,13 +27,13 @@ def set_robot_presence(self, robot_id: int, is_team_yellow: bool, should_robot_b Args: robot_id (int): The unique ID of the robot. - team_colour_is_yellow (bool): Whether the robot belongs to the yellow team. If False, it's assumed to be blue. + is_team_yellow (bool): Whether the robot belongs to the yellow team. If False, it's assumed to be blue. should_robot_be_present (bool): If True, the robot will be placed on the field; if False, it will be removed. The method calculates a teleport location based on the team and presence status, then sends a command to the simulator. """ x, y = self._get_teleport_location(robot_id, is_team_yellow, should_robot_be_present) - self.teleport_robot(is_team_yellow, robot_id, x, y, 0) + self._do_teleport_robot_unrestricted(is_team_yellow, robot_id, x, y, 0) def _get_teleport_location(self, robot_id: int, is_team_yellow: bool, add: bool) -> Tuple[float, float]: y_coord = ADD_Y_COORD if add else REMOVAL_Y_COORD diff --git a/utama_core/team_controller/src/controllers/sim/rsim_robot_controller.py b/utama_core/team_controller/src/controllers/sim/rsim_robot_controller.py index ab5e24bd..bb1504bd 100644 --- a/utama_core/team_controller/src/controllers/sim/rsim_robot_controller.py +++ b/utama_core/team_controller/src/controllers/sim/rsim_robot_controller.py @@ -16,6 +16,8 @@ class RSimRobotController(AbstractRobotController): """Robot Controller (and Vision Receiver) for RSim. + Note: also does the first reset for the environment. + pvp_manager: if not None, two controllers are playing against each other. Else, play against static @@ -27,7 +29,7 @@ def __init__( is_team_yellow: bool, n_friendly: int, env: SSLBaseEnv, - pvp_manager=None, # RSimPVPRobotController, cannot forward declare + pvp_manager: Optional["RSimPVPManager"] = None, ): super().__init__(is_team_yellow, n_friendly) self._env = env @@ -35,16 +37,16 @@ def __init__( self._out_packet = self._empty_command(self._n_friendly) self._pvp_manager = pvp_manager - if not self.pvp_manager: - self.env.reset() + if self._pvp_manager is None: + self._env.reset() def get_robots_responses(self) -> Optional[List[RobotResponse]]: return self._robots_info.popleft() if len(self._robots_info) > 0 else None def send_robot_commands(self) -> None: """Sends the robot commands to the appropriate team (yellow or blue).""" - if self.pvp_manager: - self.pvp_manager.send_command(self.is_team_yellow, self._out_packet) + if self._pvp_manager is not None: + self._pvp_manager.send_command(self.is_team_yellow, self._out_packet) else: if self.is_team_yellow: action = { diff --git a/utama_core/team_controller/src/debug_utils/grsim_controller_test.py b/utama_core/team_controller/src/debug_utils/grsim_controller_test.py deleted file mode 100644 index 53bc556f..00000000 --- a/utama_core/team_controller/src/debug_utils/grsim_controller_test.py +++ /dev/null @@ -1,14 +0,0 @@ -import numpy as np - -from utama_core.team_controller.src.controllers import GRSimController - - -def main(): - controller = GRSimController() - controller.teleport_ball(0, 0) - controller.set_robot_presence(robot_id=0, is_team_yellow=False, is_present=False) - controller.teleport_robot(is_team_yellow=True, robot_id=0, x=1, y=1, theta=np.pi) - - -if __name__ == "__main__": - main() diff --git a/utama_core/team_controller/src/debug_utils/ref_receiver_sim_controller_integration_test.py b/utama_core/team_controller/src/debug_utils/ref_receiver_sim_controller_integration_test.py deleted file mode 100644 index 6517c34d..00000000 --- a/utama_core/team_controller/src/debug_utils/ref_receiver_sim_controller_integration_test.py +++ /dev/null @@ -1,98 +0,0 @@ -import logging -import time - -from utama_core.team_controller.src.controllers import GRSimController -from utama_core.team_controller.src.data import RefereeMessageReceiver -from utama_core.team_controller.src.generated_code.ssl_gc_referee_message_pb2 import ( - Referee, -) - -""" TODO: Traceback (most recent call last): - File "/home/fredh/robocup_ssl/Robocup/src/tests/referee_receiver_test.py", line 25, in - if receiver.check_new_message(): - File "/home/fredh/robocup_ssl/Robocup/src/data/referee_receiver.py", line 34, in check_new_message - serialized_data = self._serialize_relevant_fields(data) - File "/home/fredh/robocup_ssl/Robocup/src/data/referee_receiver.py", line 23, in _serialize_relevant_fields - message_copy.ParseFromString(data) -TypeError: a bytes-like object is required, not 'NoneType' """ - -# TODO: Imcomplete implementation -logger = logging.getLogger(__name__) - -# Example usage: -if __name__ == "__main__": - receiver = RefereeMessageReceiver() - sim_control = GRSimController() - # TODO: implement place -> stop -> place -> stop sequence - desired_sequenceA = [ - Referee.BALL_PLACEMENT_YELLOW, - Referee.BALL_PLACEMENT_BLUE, - Referee.STOP, - ] - desired_sequenceB = [ - Referee.BALL_PLACEMENT_BLUE, - Referee.BALL_PLACEMENT_YELLOW, - Referee.STOP, - ] - - try: - while True: - start_time = time.time() - if receiver.check_new_message(): - command, des_pos = receiver.get_latest_command() - message = receiver.get_latest_message() - # Yellow card stuff - yellow_team_yellow_card_times = list(message.yellow.yellow_card_times) - blue_team_yellow_card_times = list(message.blue.yellow_card_times) - yellow_team_robots_removed = [] - blue_team_robots_removed = [] - id = 0 - - logger.info(f"Yellow team yellow card times: {yellow_team_yellow_card_times}") - logger.info(f"Blue team yellow card times: {blue_team_yellow_card_times}") - if len(yellow_team_yellow_card_times) == 0: - pass # print("!!!!!!!!!") - - if len(yellow_team_yellow_card_times) != 0 and len(yellow_team_yellow_card_times) > len( - yellow_team_robots_removed - ): - yellow_team_robots_removed.append(id) - for yellow_team_yellow_card_time in yellow_team_yellow_card_times[ - len(yellow_team_robots_removed) - 1 : - ]: - logger.info("Yellow card detected! (yellow)") - # TODO: Implement method to chose which robot to remove - sim_control.set_robot_presence(id, team_colour_is_blue=False, should_robot_be_present=False) - elif (robots_to_add := len(yellow_team_robots_removed) - len(yellow_team_yellow_card_times)) > 0: - for _ in range(robots_to_add): - sim_control.set_robot_presence( - yellow_team_robots_removed[0], - team_colour_is_blue=False, - should_robot_be_present=True, - ) - yellow_team_robots_removed.pop(0) - - if len(blue_team_yellow_card_times) != 0: - blue_team_robots_removed.append(id) - for blue_team_yellow_card_time in blue_team_yellow_card_times[len(blue_team_robots_removed) - 1 :]: - logger.info("Yellow card detected! (blue)") - sim_control.set_robot_presence(id, team_colour_is_blue=True, should_robot_be_present=False) - elif (robots_to_add := len(blue_team_robots_removed) - len(blue_team_yellow_card_times)) > 0: - for _ in range(robots_to_add): - sim_control.set_robot_presence( - blue_team_robots_removed[0], - team_colour_is_blue=True, - should_robot_be_present=True, - ) - blue_team_robots_removed.pop(0) - - # "Manual" automatic ball placement - if receiver.check_new_command(): - if receiver.check_command_sequence(desired_sequenceA or desired_sequenceB): - logger.info("Desired sequence detected!") - if des_pos != (0.0, 0.0): - logger.info("Teleporting ball to", des_pos) - sim_control.teleport_ball(des_pos[0] / 1000, des_pos[1] / 1000) - time.sleep(max(0, 0.03334 - (time.time() - start_time))) - except KeyboardInterrupt: - print("\nExiting...") diff --git a/utama_core/team_controller/src/debug_utils/rsim_controllers_test.py b/utama_core/team_controller/src/debug_utils/rsim_controllers_test.py deleted file mode 100644 index 72c65915..00000000 --- a/utama_core/team_controller/src/debug_utils/rsim_controllers_test.py +++ /dev/null @@ -1,39 +0,0 @@ -import logging - -from utama_core.entities.data.command import RobotCommand -from utama_core.entities.game import Game -from utama_core.rsoccer_simulator.src.ssl.envs.standard_ssl import SSLStandardEnv -from utama_core.team_controller.src.controllers import ( - RSimController, - RSimRobotController, -) - -logger = logging.getLogger(__name__) - -game = Game() - -# making environment: you can set the number of robots for blue and yellow -# their starting formation will follow the default start expressed in team_controller.src.config.starting_formation -# but you can modify this directly to create specific setup scenarios -env = SSLStandardEnv(n_robots_blue=3, n_robots_yellow=5) -robot_controller = RSimRobotController(is_team_yellow=True, env=env, game_obj=game) -game_controller = RSimController(env=env) - -# Run for 1 episode and print reward at the end -for i in range(10000): - if i > 100: - # follows GRSimRootController setup. Add robot commands them send them like below - robot_controller.add_robot_commands(RobotCommand(0, -0.01, 0, 0, 0, 1), 3) - robot_controller.add_robot_commands(RobotCommand(0, 0, 1, 0, 0, 0), 4) - if i == 150: - # RSimController gives you the abulity to teleport the ball directly - # note that neither teleport operations create a new frame, only modifies current frame. - game_controller.teleport_ball(x=1.5, y=1.5, vx=1, vy=5) - logger.info("Ball has been teleported!") - if i == 152: - # RSimController gives you the ability to teleport robots directly - game_controller.teleport_robot(is_team_yellow=True, robot_id=3, x=1.2, y=-2, theta=-1.5) - logger.info("Robot 3 (yellow) has been teleported!") - - # send robot commands after adding them - robot_controller.send_robot_commands() diff --git a/utama_core/tests/abstract_strategy/test_assertions.py b/utama_core/tests/abstract_strategy/test_assertions.py index 9d606554..cfe8201b 100644 --- a/utama_core/tests/abstract_strategy/test_assertions.py +++ b/utama_core/tests/abstract_strategy/test_assertions.py @@ -17,6 +17,8 @@ def make_dummy_blackboard(actual_field_bounds): # Helper strategy class DummyStrategy(AbstractStrategy): + exp_ball = True # Not relevant for these tests + def create_behaviour_tree(self): return AbstractBehaviour() diff --git a/utama_core/tests/controller/__init__.py b/utama_core/tests/controller/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/utama_core/tests/controller/test_sim_controller.py b/utama_core/tests/controller/test_sim_controller.py new file mode 100644 index 00000000..4d721d60 --- /dev/null +++ b/utama_core/tests/controller/test_sim_controller.py @@ -0,0 +1,270 @@ +"""Tests for AbstractSimController bounds-checking behaviour.""" + +from unittest.mock import MagicMock, call, patch + +import pytest + +from utama_core.entities.game.field import Field, FieldBounds +from utama_core.team_controller.src.controllers.common.sim_controller_abstract import ( + AbstractSimController, +) + +# --------------------------------------------------------------------------- +# Concrete stub – fulfils abstract interface without any real I/O +# --------------------------------------------------------------------------- + + +class StubSimController(AbstractSimController): + """Minimal concrete subclass for unit-testing AbstractSimController.""" + + def __init__(self, field_bounds: FieldBounds, exp_ball: bool = True): + super().__init__(field_bounds, exp_ball) + self.teleport_ball_calls: list[tuple] = [] + self.teleport_robot_calls: list[tuple] = [] + + def _do_teleport_ball_unrestricted(self, x, y, vx, vy): + self.teleport_ball_calls.append((x, y, vx, vy)) + + def _do_teleport_robot_unrestricted(self, is_team_yellow, robot_id, x, y, theta=None): + self.teleport_robot_calls.append((is_team_yellow, robot_id, x, y, theta)) + + def set_robot_presence(self, robot_id, is_team_yellow, should_robot_be_present): + pass # not under test here + + +# --------------------------------------------------------------------------- +# Shared fixtures +# --------------------------------------------------------------------------- + +# Standard SSL full-field bounds: x ∈ [-4.5, 4.5], y ∈ [-3.0, 3.0] +FULL_BOUNDS = Field.FULL_FIELD_BOUNDS + +# A small custom bounds used for many tests: x ∈ [0, 2], y ∈ [-1, 1] +CUSTOM_BOUNDS = FieldBounds(top_left=(0.0, 1.0), bottom_right=(2.0, -1.0)) + + +@pytest.fixture +def ctrl(): + """Controller using custom bounds with ball expected.""" + return StubSimController(CUSTOM_BOUNDS, exp_ball=True) + + +@pytest.fixture +def ctrl_no_ball(): + """Controller configured without a ball.""" + return StubSimController(CUSTOM_BOUNDS, exp_ball=False) + + +@pytest.fixture +def ctrl_full(): + """Controller using full-field bounds.""" + return StubSimController(FULL_BOUNDS, exp_ball=True) + + +# =========================================================================== +# teleport_ball +# =========================================================================== + + +class TestTeleportBall: + # --- happy-path --- + + def test_inside_bounds_calls_unrestricted(self, ctrl): + ctrl.teleport_ball(1.0, 0.0) + assert ctrl.teleport_ball_calls == [(1.0, 0.0, 0, 0)] + + def test_velocity_forwarded(self, ctrl): + ctrl.teleport_ball(1.0, 0.0, vx=0.5, vy=-0.3) + assert ctrl.teleport_ball_calls == [(1.0, 0.0, 0.5, -0.3)] + + def test_center_of_bounds_accepted(self, ctrl): + ctrl.teleport_ball(1.0, 0.0) # center of CUSTOM_BOUNDS + assert len(ctrl.teleport_ball_calls) == 1 + + @pytest.mark.parametrize( + "x, y", + [ + (0.0, 0.0), # min-x, center-y corner + (2.0, 0.0), # max-x, center-y corner + (1.0, 1.0), # center-x, max-y boundary + (1.0, -1.0), # center-x, min-y boundary + (0.0, 1.0), # top-left exact corner + (2.0, -1.0), # bottom-right exact corner + ], + ) + def test_boundary_positions_accepted(self, ctrl, x, y): + ctrl.teleport_ball(x, y) + assert len(ctrl.teleport_ball_calls) == 1 + + # --- out-of-bounds --- + + @pytest.mark.parametrize( + "x, y", + [ + (3.0, 0.0), # x too large + (-1.0, 0.0), # x too small + (1.0, 2.0), # y too large + (1.0, -2.0), # y too small + (2.1, -1.1), # both axes out + (0.0, 1.1), # y just over the top + (2.0, -1.01), # y just under the bottom + ], + ) + def test_out_of_bounds_raises(self, ctrl, x, y): + with pytest.raises(ValueError, match=r"outside of the field boundaries"): + ctrl.teleport_ball(x, y) + + def test_out_of_bounds_does_not_call_unrestricted(self, ctrl): + with pytest.raises(ValueError): + ctrl.teleport_ball(99.0, 99.0) + assert ctrl.teleport_ball_calls == [] + + # --- exp_ball=False --- + + def test_raises_when_ball_not_expected(self, ctrl_no_ball): + with pytest.raises(ValueError, match=r"not expect a ball"): + ctrl_no_ball.teleport_ball(1.0, 0.0) + + def test_ball_not_expected_does_not_check_bounds(self, ctrl_no_ball): + """exp_ball check must fire before bounds check.""" + with pytest.raises(ValueError, match=r"not expect a ball"): + ctrl_no_ball.teleport_ball(99.0, 99.0) + + def test_ball_not_expected_unrestricted_not_called(self, ctrl_no_ball): + with pytest.raises(ValueError): + ctrl_no_ball.teleport_ball(1.0, 0.0) + assert ctrl_no_ball.teleport_ball_calls == [] + + # --- full-field bounds --- + + @pytest.mark.parametrize( + "x, y", + [ + (0.0, 0.0), + (-4.5, 3.0), # top-left + (4.5, -3.0), # bottom-right + (4.5, 3.0), # top-right + (-4.5, -3.0), # bottom-left + ], + ) + def test_full_field_boundary_accepted(self, ctrl_full, x, y): + ctrl_full.teleport_ball(x, y) + assert len(ctrl_full.teleport_ball_calls) == 1 + + @pytest.mark.parametrize( + "x, y", + [ + (4.51, 0.0), + (-4.51, 0.0), + (0.0, 3.01), + (0.0, -3.01), + ], + ) + def test_just_outside_full_field_raises(self, ctrl_full, x, y): + with pytest.raises(ValueError): + ctrl_full.teleport_ball(x, y) + + +# =========================================================================== +# teleport_robot +# =========================================================================== + + +class TestTeleportRobot: + # --- happy-path --- + + def test_inside_bounds_calls_unrestricted(self, ctrl): + ctrl.teleport_robot(True, 0, 1.0, 0.0, 0.0) + assert ctrl.teleport_robot_calls == [(True, 0, 1.0, 0.0, 0.0)] + + def test_theta_forwarded(self, ctrl): + import math + + ctrl.teleport_robot(False, 2, 1.0, 0.5, math.pi / 2) + assert ctrl.teleport_robot_calls == [(False, 2, 1.0, 0.5, math.pi / 2)] + + def test_theta_none_forwarded(self, ctrl): + ctrl.teleport_robot(True, 1, 1.0, 0.0) + assert ctrl.teleport_robot_calls == [(True, 1, 1.0, 0.0, None)] + + @pytest.mark.parametrize( + "x, y", + [ + (0.0, 0.0), + (2.0, 0.0), + (1.0, 1.0), + (1.0, -1.0), + (0.0, 1.0), + (2.0, -1.0), + ], + ) + def test_boundary_positions_accepted(self, ctrl, x, y): + ctrl.teleport_robot(True, 0, x, y) + assert len(ctrl.teleport_robot_calls) == 1 + + # --- out-of-bounds --- + + @pytest.mark.parametrize( + "x, y", + [ + (3.0, 0.0), + (-0.1, 0.0), + (1.0, 1.1), + (1.0, -1.1), + (-1.0, -2.0), + ], + ) + def test_out_of_bounds_raises(self, ctrl, x, y): + with pytest.raises(ValueError, match=r"outside of the field boundaries"): + ctrl.teleport_robot(True, 0, x, y) + + def test_out_of_bounds_does_not_call_unrestricted(self, ctrl): + with pytest.raises(ValueError): + ctrl.teleport_robot(True, 0, 99.0, 99.0) + assert ctrl.teleport_robot_calls == [] + + # --- exp_ball flag should NOT affect robot teleport --- + + def test_robot_teleport_allowed_when_no_ball(self, ctrl_no_ball): + ctrl_no_ball.teleport_robot(True, 0, 1.0, 0.0) + assert ctrl_no_ball.teleport_robot_calls == [(True, 0, 1.0, 0.0, None)] + + def test_robot_out_of_bounds_still_raises_when_no_ball(self, ctrl_no_ball): + with pytest.raises(ValueError, match=r"outside of the field boundaries"): + ctrl_no_ball.teleport_robot(True, 0, 99.0, 0.0) + + # --- team flag --- + + @pytest.mark.parametrize("is_yellow", [True, False]) + def test_team_flag_forwarded(self, ctrl, is_yellow): + ctrl.teleport_robot(is_yellow, 3, 1.0, 0.0, 0.0) + assert ctrl.teleport_robot_calls[0][0] == is_yellow + + @pytest.mark.parametrize("rid", [0, 1, 5]) + def test_robot_id_forwarded(self, ctrl, rid): + ctrl.teleport_robot(True, rid, 1.0, 0.0, 0.0) + assert ctrl.teleport_robot_calls[0][1] == rid + + +# =========================================================================== +# remove_ball +# =========================================================================== + + +class TestRemoveBall: + def test_remove_ball_calls_unrestricted(self, ctrl): + ctrl.remove_ball() + assert len(ctrl.teleport_ball_calls) == 1 + + def test_remove_ball_places_outside_field(self, ctrl): + ctrl.remove_ball() + x, y, _, _ = ctrl.teleport_ball_calls[0] + # The removed position must be outside the controller's own field bounds + from utama_core.global_utils.math_utils import in_field_bounds + + assert not in_field_bounds((x, y), ctrl.field_bounds) + + def test_remove_ball_works_when_ball_not_expected(self, ctrl_no_ball): + """remove_ball bypasses exp_ball guard (it calls the unrestricted method directly).""" + ctrl_no_ball.remove_ball() + assert len(ctrl_no_ball.teleport_ball_calls) == 1 diff --git a/utama_core/tests/motion_planning/multiple_robots_test.py b/utama_core/tests/motion_planning/multiple_robots_test.py index 377f37b0..9191f03a 100644 --- a/utama_core/tests/motion_planning/multiple_robots_test.py +++ b/utama_core/tests/motion_planning/multiple_robots_test.py @@ -7,6 +7,7 @@ from utama_core.entities.data.vector import Vector2D from utama_core.entities.game import Game from utama_core.run import StrategyRunner +from utama_core.strategy.examples import MultiRobotNavigationStrategy from utama_core.team_controller.src.controllers import AbstractSimController from utama_core.tests.common.abstract_test_manager import ( AbstractTestManager, @@ -51,8 +52,6 @@ def reset_field(self, sim_controller: AbstractSimController, game: Game): for i, (x, y) in enumerate(self.scenario.enemy_positions): sim_controller.teleport_robot(not game.my_team_is_yellow, i, x, y, 0.0) - sim_controller.teleport_ball(0.0, 0.0) - self._reset_metrics() def _reset_metrics(self): @@ -161,12 +160,6 @@ def test_mirror_swap( endpoint_tolerance=0.3, ) - # For simplicity, use a parallel strategy that runs all robot strategies - # Since we need a single strategy object, we'll create a custom one - from utama_core.tests.motion_planning.strategies.multi_robot_navigation_strategy import ( - MultiRobotNavigationStrategy, - ) - my_strategy = MultiRobotNavigationStrategy( robot_targets={i: right_positions[i] for i in range(len(left_positions))} ) @@ -182,6 +175,7 @@ def test_mirror_swap( mode=mode, exp_friendly=6, exp_enemy=6, + exp_ball=False, opp_strategy=opp_strategy, ) @@ -257,10 +251,6 @@ def test_diagonal_cross_square( endpoint_tolerance=0.25, ) - from utama_core.tests.motion_planning.strategies.multi_robot_navigation_strategy import ( - MultiRobotNavigationStrategy, - ) - my_strategy = MultiRobotNavigationStrategy( robot_targets={i: yellow_targets[i] for i in range(len(yellow_positions))} ) @@ -274,6 +264,7 @@ def test_diagonal_cross_square( mode=mode, exp_friendly=2, exp_enemy=2, + exp_ball=False, opp_strategy=opp_strategy, ) diff --git a/utama_core/tests/motion_planning/random_movement_test.py b/utama_core/tests/motion_planning/random_movement_test.py index 85a81426..2f364406 100644 --- a/utama_core/tests/motion_planning/random_movement_test.py +++ b/utama_core/tests/motion_planning/random_movement_test.py @@ -8,16 +8,16 @@ from utama_core.config.physical_constants import ROBOT_RADIUS from utama_core.entities.data.vector import Vector2D from utama_core.entities.game import Game -from utama_core.entities.game.field import Field +from utama_core.entities.game.field import Field, FieldBounds from utama_core.run import StrategyRunner +from utama_core.strategy.examples.motion_planning.random_movement_strategy import ( + RandomMovementStrategy, +) from utama_core.team_controller.src.controllers import AbstractSimController from utama_core.tests.common.abstract_test_manager import ( AbstractTestManager, TestingStatus, ) -from utama_core.tests.motion_planning.strategies.random_movement_strategy import ( - RandomMovementStrategy, -) # Fix pygame window position for screen capture os.environ["SDL_VIDEO_WINDOW_POS"] = "100,100" @@ -28,7 +28,7 @@ class RandomMovementScenario: """Configuration for random movement collision test.""" n_robots: int - field_bounds: tuple[tuple[float, float], tuple[float, float]] # ((min_x, max_x), (min_y, max_y)) + field_bounds: FieldBounds min_target_distance: float required_targets_per_robot: int collision_threshold: float = ROBOT_RADIUS * 2.0 @@ -50,7 +50,11 @@ def __init__(self, scenario: RandomMovementScenario): def reset_field(self, sim_controller: AbstractSimController, game: Game): """Reset field with robots in random starting positions within bounds.""" - (min_x, max_x), (min_y, max_y) = self.scenario.field_bounds + bounds = self.scenario.field_bounds + min_x = min(bounds.top_left[0], bounds.bottom_right[0]) + max_x = max(bounds.top_left[0], bounds.bottom_right[0]) + min_y = min(bounds.top_left[1], bounds.bottom_right[1]) + max_y = max(bounds.top_left[1], bounds.bottom_right[1]) # Use a fixed seed for reproducibility across test runs rng = random.Random(42 + self.current_episode_number) @@ -61,8 +65,6 @@ def reset_field(self, sim_controller: AbstractSimController, game: Game): sim_controller.teleport_robot(game.my_team_is_yellow, i, x, y, 0.0) self.targets_reached_count[i] = 0 - sim_controller.teleport_ball(0.0, 0.0) - self._reset_metrics() def _reset_metrics(self): @@ -120,27 +122,28 @@ def test_random_movement_same_team( my_team_is_yellow = True my_team_is_right = False # Yellow on left half - # Define half court bounds for left side (Yellow team) - # Standard SSL field is ~9m x 6m, so half court is ~4.5m x 6m - # Using slightly smaller bounds for safety: -4m to 0m in x, -2.5m to 2.5m in y - X_BUFFER = 0.5 - Y_BUFFER = 1.0 - field_bounds = ( - (-Field.FULL_FIELD_HALF_LENGTH + X_BUFFER, -X_BUFFER), - ( - -Field.FULL_FIELD_HALF_WIDTH + Y_BUFFER, - Field.FULL_FIELD_HALF_WIDTH - Y_BUFFER, + # use small bounds to increase chance of collision + small_bounds = FieldBounds( + top_left=( + -Field.FULL_FIELD_HALF_LENGTH + 1, + Field.FULL_FIELD_HALF_WIDTH - 1, ), - ) # ((min_x, max_x), (min_y, max_y)) + bottom_right=( + -Field.FULL_FIELD_HALF_LENGTH + 3, + Field.FULL_FIELD_HALF_WIDTH - 3, + ), + ) # Max is 6 robots n_robots = 2 + seed = 42 + scenario = RandomMovementScenario( n_robots=n_robots, - field_bounds=field_bounds, + field_bounds=small_bounds, min_target_distance=1.0, # Minimum distance for next target - required_targets_per_robot=3, # Each robot must reach 3 targets + required_targets_per_robot=5, # Each robot must reach 5 targets endpoint_tolerance=0.3, ) @@ -149,11 +152,11 @@ def test_random_movement_same_team( # Create random movement strategy strategy = RandomMovementStrategy( n_robots=n_robots, - field_bounds=field_bounds, + field_bounds=small_bounds, min_target_distance=scenario.min_target_distance, + seed=seed, endpoint_tolerance=scenario.endpoint_tolerance, - test_manager=test_manager, - speed_range=(0.5, 1.0), # Random speed between 0.5 and 1.0 m/s + on_target_reached=test_manager.update_target_reached, ) runner = StrategyRunner( @@ -163,6 +166,7 @@ def test_random_movement_same_team( mode=mode, exp_friendly=n_robots, exp_enemy=0, + exp_ball=False, ) test_passed = runner.run_test( diff --git a/utama_core/tests/motion_planning/single_robot_moving_obstacle_test.py b/utama_core/tests/motion_planning/single_robot_moving_obstacle_test.py index 35ff8353..23d64f14 100644 --- a/utama_core/tests/motion_planning/single_robot_moving_obstacle_test.py +++ b/utama_core/tests/motion_planning/single_robot_moving_obstacle_test.py @@ -6,17 +6,15 @@ from utama_core.entities.data.vector import Vector2D from utama_core.entities.game import Game from utama_core.run import StrategyRunner +from utama_core.strategy.examples import ( + OscillatingObstacleStrategy, + SimpleNavigationStrategy, +) from utama_core.team_controller.src.controllers import AbstractSimController from utama_core.tests.common.abstract_test_manager import ( AbstractTestManager, TestingStatus, ) -from utama_core.tests.motion_planning.strategies.oscillating_obstacle_strategy import ( - OscillatingObstacleStrategy, -) -from utama_core.tests.motion_planning.strategies.simple_navigation_strategy import ( - SimpleNavigationStrategy, -) @dataclass @@ -74,12 +72,6 @@ def reset_field(self, sim_controller: AbstractSimController, game: Game): 0.0, ) - # Place ball at target (for visual reference) - sim_controller.teleport_ball( - self.scenario.target_position[0], - self.scenario.target_position[1], - ) - self._reset_metrics() def _reset_metrics(self): @@ -197,6 +189,7 @@ def test_single_robot_moving_obstacles( mode=mode, exp_friendly=1, exp_enemy=len(scenario.moving_obstacles), + exp_ball=False, opp_strategy=opp_strategy, opp_control_scheme="pid", # Use PID so obstacles follow exact paths without avoiding the robot ) diff --git a/utama_core/tests/motion_planning/single_robot_static_obstacle_test.py b/utama_core/tests/motion_planning/single_robot_static_obstacle_test.py index 164541ef..d69c6ea3 100644 --- a/utama_core/tests/motion_planning/single_robot_static_obstacle_test.py +++ b/utama_core/tests/motion_planning/single_robot_static_obstacle_test.py @@ -6,14 +6,12 @@ from utama_core.entities.data.vector import Vector2D from utama_core.entities.game import Game from utama_core.run import StrategyRunner +from utama_core.strategy.examples import SimpleNavigationStrategy from utama_core.team_controller.src.controllers import AbstractSimController from utama_core.tests.common.abstract_test_manager import ( AbstractTestManager, TestingStatus, ) -from utama_core.tests.motion_planning.strategies.simple_navigation_strategy import ( - SimpleNavigationStrategy, -) @dataclass @@ -53,12 +51,6 @@ def reset_field(self, sim_controller: AbstractSimController, game: Game): for i, (x, y) in enumerate(self.scenario.obstacle_positions): sim_controller.teleport_robot(not game.my_team_is_yellow, i, x, y, 0.0) - # Place ball at target (for visual reference) - sim_controller.teleport_ball( - self.scenario.target_position[0], - self.scenario.target_position[1], - ) - self._reset_metrics() def _reset_metrics(self): @@ -160,6 +152,7 @@ def test_collision_avoidance_goal_to_goal( mode=mode, exp_friendly=1, exp_enemy=len(obstacle_config["obstacles"]), + exp_ball=False, ) test_manager = CollisionAvoidanceTestManager(scenario=scenario, robot_id=robot_id) @@ -209,6 +202,7 @@ def test_simple_straight_line_no_obstacles( mode=mode, exp_friendly=1, exp_enemy=0, + exp_ball=False, ) test_manager = CollisionAvoidanceTestManager(scenario=scenario, robot_id=robot_id) diff --git a/utama_core/tests/motion_planning/strategies/random_movement_strategy.py b/utama_core/tests/motion_planning/strategies/random_movement_strategy.py deleted file mode 100644 index 1555d748..00000000 --- a/utama_core/tests/motion_planning/strategies/random_movement_strategy.py +++ /dev/null @@ -1,210 +0,0 @@ -"""Strategy for random movement within bounded area.""" - -from __future__ import annotations - -import random -from typing import Optional, Tuple - -import py_trees - -from utama_core.entities.data.vector import Vector2D -from utama_core.entities.game.field import FieldBounds -from utama_core.skills.src.utils.move_utils import move -from utama_core.strategy.common.abstract_behaviour import AbstractBehaviour -from utama_core.strategy.common.abstract_strategy import AbstractStrategy -from utama_core.tests.common.abstract_test_manager import AbstractTestManager - - -class RandomMovementBehaviour(AbstractBehaviour): - """ - Behaviour that makes a robot move to random targets within bounds. - - Args: - robot_id: The robot ID to control - field_bounds: ((min_x, max_x), (min_y, max_y)) bounds for movement - min_target_distance: Minimum distance for selecting next target - endpoint_tolerance: Distance to consider target reached - test_manager: Manager used to record test-related metrics such as targets reached - speed_range: (min_speed, max_speed) for random speed selection - """ - - def __init__( - self, - robot_id: int, - field_bounds: Tuple[Tuple[float, float], Tuple[float, float]], - min_target_distance: float, - endpoint_tolerance: float, - test_manager: AbstractTestManager, - speed_range: Tuple[float, float] = (0.5, 2.0), - ): - super().__init__(name=f"RandomMovement_{robot_id}") - self.robot_id = robot_id - self.field_bounds = field_bounds - self.min_target_distance = min_target_distance - self.endpoint_tolerance = endpoint_tolerance - self.speed_range = speed_range - - self.current_target = None - self.current_speed = None - - self.test_manager = test_manager - - def _generate_random_target(self, current_pos: Vector2D) -> Vector2D: - """Generate a random target position within bounds, min distance away from current position.""" - MAX_ATTEMPTS = 50 - PADDING = 0.3 - - (min_x, max_x), (min_y, max_y) = self.field_bounds - - for _ in range(MAX_ATTEMPTS): - x = random.uniform(min_x + PADDING, max_x - PADDING) - y = random.uniform(min_y + PADDING, max_y - PADDING) - target = Vector2D(x, y) - - # Check if target is far enough from current position - distance = current_pos.distance_to(target) - if distance >= self.min_target_distance: - return target - - # Fallback: just return a random position even if too close - x = random.uniform(min_x + PADDING, max_x - PADDING) - y = random.uniform(min_y + PADDING, max_y - PADDING) - return Vector2D(x, y) - - def initialise(self): - """Initialize with a random target and speed.""" - # Will set target on first update when we have robot position - self.current_target = None - self.current_speed = random.uniform(*self.speed_range) - - def update(self) -> py_trees.common.Status: - """Command robot to move to random targets.""" - game = self.blackboard.game - rsim_env = self.blackboard.rsim_env - - if not game.friendly_robots or self.robot_id not in game.friendly_robots: - return py_trees.common.Status.RUNNING - - robot = game.friendly_robots[self.robot_id] - robot_pos = Vector2D(robot.p.x, robot.p.y) - - # Generate initial target if needed - if self.current_target is None: - self.current_target = self._generate_random_target(robot_pos) - self.current_speed = random.uniform(*self.speed_range) - - # Check if target reached - distance_to_target = robot_pos.distance_to(self.current_target) - if distance_to_target <= self.endpoint_tolerance: - # Target reached! Generate new target - self.test_manager.update_target_reached(self.robot_id) - - # Generate new target and speed - self.current_target = self._generate_random_target(robot_pos) - self.current_speed = random.uniform(*self.speed_range) - - # Visualize target - if rsim_env: - rsim_env.draw_point(self.current_target.x, self.current_target.y, color="green") - # Draw a line to show path - rsim_env.draw_line( - [ - (robot_pos.x, robot_pos.y), - (self.current_target.x, self.current_target.y), - ], - color="blue", - width=1, - ) - - # Generate movement command - cmd = move( - game, - self.blackboard.motion_controller, - self.robot_id, - self.current_target, - 0.0, # Face forward - ) - - self.blackboard.cmd_map[self.robot_id] = cmd - return py_trees.common.Status.RUNNING - - -class RandomMovementStrategy(AbstractStrategy): - """ - Strategy that controls multiple robots moving randomly within bounds. - - Args: - n_robots: Number of robots to control - field_bounds: ((min_x, max_x), (min_y, max_y)) bounds for movement - min_target_distance: Minimum distance for selecting next target - endpoint_tolerance: Distance to consider target reached - test_manager: Manager used to record test-related metrics such as targets reached - speed_range: (min_speed, max_speed) for random speed selection - """ - - def __init__( - self, - n_robots: int, - field_bounds: Tuple[Tuple[float, float], Tuple[float, float]], - min_target_distance: float, - endpoint_tolerance: float, - test_manager: AbstractTestManager, - speed_range: Tuple[float, float] = (0.5, 2.0), - ): - self.n_robots = n_robots - self.field_bounds = field_bounds - self.min_target_distance = min_target_distance - self.endpoint_tolerance = endpoint_tolerance - self.speed_range = speed_range - self.test_manager = test_manager - super().__init__() - - def assert_exp_robots(self, n_runtime_friendly: int, n_runtime_enemy: int): - """Requires number of friendly robots to match.""" - return n_runtime_friendly == self.n_robots - - def assert_exp_goals(self, includes_my_goal_line: bool, includes_opp_goal_line: bool): - return True - - def get_min_bounding_zone(self) -> Optional[FieldBounds]: - """Return the movement bounds.""" - (min_x, max_x), (min_y, max_y) = self.field_bounds - - PADDING = 0.5 - return FieldBounds( - top_left=(min_x - PADDING, max_y + PADDING), - bottom_right=(max_x + PADDING, min_y - PADDING), - ) - - def create_behaviour_tree(self) -> py_trees.behaviour.Behaviour: - """Create parallel behaviour tree with all robot random movement behaviours.""" - if self.n_robots == 1: - # Single robot - return RandomMovementBehaviour( - robot_id=0, - field_bounds=self.field_bounds, - min_target_distance=self.min_target_distance, - endpoint_tolerance=self.endpoint_tolerance, - speed_range=self.speed_range, - test_manager=self.test_manager, - ) - - # Multiple robots - create parallel behaviours - behaviours = [] - for robot_id in range(self.n_robots): - behaviour = RandomMovementBehaviour( - robot_id=robot_id, - field_bounds=self.field_bounds, - min_target_distance=self.min_target_distance, - endpoint_tolerance=self.endpoint_tolerance, - speed_range=self.speed_range, - test_manager=self.test_manager, - ) - behaviours.append(behaviour) - - # Run all robot behaviours in parallel - return py_trees.composites.Parallel( - name="RandomMovement", - policy=py_trees.common.ParallelPolicy.SuccessOnAll(), - children=behaviours, - ) diff --git a/utama_core/tests/strategy_runner/strat_runner_test_utils.py b/utama_core/tests/strategy_runner/strat_runner_test_utils.py index f931ade9..5cad1a1d 100644 --- a/utama_core/tests/strategy_runner/strat_runner_test_utils.py +++ b/utama_core/tests/strategy_runner/strat_runner_test_utils.py @@ -1,4 +1,6 @@ class DummyStrategy: + exp_ball: bool = True # Not relevant for these tests + def assert_exp_robots(self, exp_friendly, exp_enemy): return True diff --git a/utama_core/tests/strategy_runner/test_exp_ball.py b/utama_core/tests/strategy_runner/test_exp_ball.py new file mode 100644 index 00000000..3049117a --- /dev/null +++ b/utama_core/tests/strategy_runner/test_exp_ball.py @@ -0,0 +1,277 @@ +"""Tests that exp_ball is correctly handled by StrategyRunner. + +Covers three layers: + 1. Constructor-level misconfig — strategy.exp_ball != runner exp_ball raises RuntimeError. + 2. Constructor-level agreement — no error raised when both sides agree. + 3. Integration — the actual game frame reflects the expected ball presence/absence. +""" + +import os +from typing import Optional + +import py_trees +import pytest + +from utama_core.entities.game import Game +from utama_core.entities.game.field import FieldBounds +from utama_core.run.strategy_runner import StrategyRunner +from utama_core.strategy.common.abstract_strategy import AbstractStrategy +from utama_core.team_controller.src.controllers import AbstractSimController +from utama_core.tests.common.abstract_test_manager import ( + AbstractTestManager, + TestingStatus, +) +from utama_core.tests.strategy_runner.strat_runner_test_utils import DummyStrategy + +os.environ["SDL_VIDEO_WINDOW_POS"] = "100,100" + + +# --------------------------------------------------------------------------- +# Minimal idle strategies with controllable exp_ball +# --------------------------------------------------------------------------- + + +class _IdleWithBallStrategy(AbstractStrategy): + """Idle strategy that expects the ball to be present (exp_ball=True).""" + + exp_ball: bool = True + + def create_behaviour_tree(self) -> py_trees.behaviour.Behaviour: + return py_trees.behaviours.Success(name="Idle") + + def assert_exp_robots(self, n_runtime_friendly: int, n_runtime_enemy: int) -> bool: + return True + + def assert_exp_goals(self, includes_my_goal_line: bool, includes_opp_goal_line: bool) -> bool: + return True + + def get_min_bounding_zone(self) -> Optional[FieldBounds]: + return None + + +class _IdleNoBallStrategy(AbstractStrategy): + """Idle strategy that expects NO ball (exp_ball=False).""" + + exp_ball: bool = False + + def create_behaviour_tree(self) -> py_trees.behaviour.Behaviour: + return py_trees.behaviours.Success(name="Idle") + + def assert_exp_robots(self, n_runtime_friendly: int, n_runtime_enemy: int) -> bool: + return True + + def assert_exp_goals(self, includes_my_goal_line: bool, includes_opp_goal_line: bool) -> bool: + return True + + def get_min_bounding_zone(self) -> Optional[FieldBounds]: + return None + + +# --------------------------------------------------------------------------- +# 1. Misconfig tests (constructor-level, no simulation needed) +# --------------------------------------------------------------------------- + + +def test_exp_ball_mismatch_strategy_true_runner_false(): + """Strategy expects ball but runner says exp_ball=False → RuntimeError.""" + strat = DummyStrategy() + strat.exp_ball = True + with pytest.raises(RuntimeError, match="Ball expected"): + StrategyRunner( + strategy=strat, + my_team_is_yellow=True, + my_team_is_right=True, + mode="rsim", + exp_friendly=3, + exp_enemy=3, + exp_ball=False, + ) + + +def test_exp_ball_mismatch_strategy_false_runner_true(): + """ + Strategy expects no ball but runner says exp_ball=True → No issue, runner remains exp_ball=True. + This is a design choice to allow strategies to be agnostic to ball presence if they don't care about it. + """ + strat = DummyStrategy() + strat.exp_ball = False + run = StrategyRunner( + strategy=strat, + my_team_is_yellow=True, + my_team_is_right=True, + mode="rsim", + exp_friendly=3, + exp_enemy=3, + exp_ball=True, + ) + assert run.exp_ball is True, "Runner should remain exp_ball=True even if strategy has exp_ball=False" + + +# --------------------------------------------------------------------------- +# 2. Agreement tests (constructor-level, no simulation needed) +# --------------------------------------------------------------------------- + + +def test_exp_ball_agreement_true_does_not_raise(): + """Strategy and runner both have exp_ball=True → no error, flag is set.""" + strat = DummyStrategy() + strat.exp_ball = True + runner = StrategyRunner( + strategy=strat, + my_team_is_yellow=True, + my_team_is_right=True, + mode="rsim", + exp_friendly=3, + exp_enemy=3, + exp_ball=True, + ) + assert runner.exp_ball is True + + +def test_exp_ball_agreement_false_does_not_raise(): + """Strategy and runner both have exp_ball=False → no error, flag is set.""" + strat = DummyStrategy() + strat.exp_ball = False + runner = StrategyRunner( + strategy=strat, + my_team_is_yellow=True, + my_team_is_right=True, + mode="rsim", + exp_friendly=3, + exp_enemy=3, + exp_ball=False, + ) + assert runner.exp_ball is False + + +# --------------------------------------------------------------------------- +# 3. Integration tests — verify actual game frame state +# --------------------------------------------------------------------------- + + +class _BallPresentManager(AbstractTestManager): + """Records whether game.ball is non-None on the first eval call, then passes.""" + + n_episodes = 1 + + def __init__(self): + super().__init__() + self.ball_seen: Optional[bool] = None + + def reset_field(self, sim_controller: AbstractSimController, game: Game): + sim_controller.teleport_robot(game.my_team_is_yellow, 0, -3.0, 0.0) + sim_controller.teleport_ball(0.0, 0.0) + + def eval_status(self, game: Game) -> TestingStatus: + if self.ball_seen is None: + self.ball_seen = game.ball is not None + return TestingStatus.SUCCESS + + +class _BallAbsentManager(AbstractTestManager): + """Verifies game.ball is None on EVERY eval call when exp_ball=False. + + Returns FAILURE immediately on the first frame where a ball is seen, so + the test fails fast with a clear signal. Returns SUCCESS after + N_FRAMES_TO_CHECK frames with no ball, confirming the invariant holds + throughout and is not just an initial transient. + """ + + n_episodes = 1 + N_FRAMES_TO_CHECK = 20 + + def __init__(self): + super().__init__() + self.frames_checked: int = 0 + + def reset_field(self, sim_controller: AbstractSimController, game: Game): + sim_controller.teleport_robot(game.my_team_is_yellow, 0, -3.0, 0.0) + # No ball teleport — ball has already been removed off-field by StrategyRunner + + def eval_status(self, game: Game) -> TestingStatus: + self.frames_checked += 1 + if game.ball is not None: + return TestingStatus.FAILURE + if self.frames_checked >= self.N_FRAMES_TO_CHECK: + return TestingStatus.SUCCESS + return TestingStatus.IN_PROGRESS + + +def test_exp_ball_true_ball_present_in_game(): + """When exp_ball=True, game.ball must be non-None in the first game frame.""" + tm = _BallPresentManager() + runner = StrategyRunner( + strategy=_IdleWithBallStrategy(), + my_team_is_yellow=True, + my_team_is_right=False, + mode="rsim", + exp_friendly=1, + exp_enemy=0, + exp_ball=True, + ) + passed = runner.run_test(tm, episode_timeout=5.0, rsim_headless=True) + assert passed + assert tm.ball_seen is True, "game.ball should be non-None when exp_ball=True" + + +def test_exp_ball_true_ball_present_in_game_with_filtering(): + """Same as above but with Kalman filtering enabled.""" + tm = _BallPresentManager() + runner = StrategyRunner( + strategy=_IdleWithBallStrategy(), + my_team_is_yellow=True, + my_team_is_right=False, + mode="rsim", + exp_friendly=1, + exp_enemy=0, + exp_ball=True, + filtering=True, + ) + passed = runner.run_test(tm, episode_timeout=5.0, rsim_headless=True) + assert passed + assert tm.ball_seen is True, "game.ball should be non-None when exp_ball=True (filtering=True)" + + +def test_exp_ball_false_ball_absent_in_game(): + """When exp_ball=False, game.ball must be None on every game frame throughout the episode.""" + tm = _BallAbsentManager() + runner = StrategyRunner( + strategy=_IdleNoBallStrategy(), + my_team_is_yellow=True, + my_team_is_right=False, + mode="rsim", + exp_friendly=1, + exp_enemy=0, + exp_ball=False, + ) + passed = runner.run_test(tm, episode_timeout=10.0, rsim_headless=True) + assert passed, ( + f"game.ball was non-None within the first {tm.frames_checked} frames — " + "expected None on every frame when exp_ball=False" + ) + assert ( + tm.frames_checked >= _BallAbsentManager.N_FRAMES_TO_CHECK + ), "Test timed out before reaching the required number of frames" + + +def test_exp_ball_false_ball_absent_in_game_with_filtering(): + """Same as above but with Kalman filtering enabled — the filter must not impute a ball.""" + tm = _BallAbsentManager() + runner = StrategyRunner( + strategy=_IdleNoBallStrategy(), + my_team_is_yellow=True, + my_team_is_right=False, + mode="rsim", + exp_friendly=1, + exp_enemy=0, + exp_ball=False, + filtering=True, + ) + passed = runner.run_test(tm, episode_timeout=10.0, rsim_headless=True) + assert passed, ( + f"game.ball was non-None within the first {tm.frames_checked} frames — " + "expected None on every frame when exp_ball=False (filtering=True)" + ) + assert ( + tm.frames_checked >= _BallAbsentManager.N_FRAMES_TO_CHECK + ), "Test timed out before reaching the required number of frames" diff --git a/utama_core/tests/strategy_runner/test_runner_misconfig.py b/utama_core/tests/strategy_runner/test_runner_misconfig.py index edf8bb5a..417e4a1a 100644 --- a/utama_core/tests/strategy_runner/test_runner_misconfig.py +++ b/utama_core/tests/strategy_runner/test_runner_misconfig.py @@ -30,22 +30,22 @@ def test_load_mode_invalid(base_runner): def test_assert_exp_robots_valid(base_runner): - base_runner._assert_exp_robots(3, 3) # Should not raise + base_runner._assert_exp_robots_and_ball(3, 3, True) # Should not raise def test_assert_exp_robots_too_many_friendly(base_runner): - with pytest.raises(AssertionError): - base_runner._assert_exp_robots(999, 3) + with pytest.raises(ValueError): + base_runner._assert_exp_robots_and_ball(999, 3, True) def test_assert_exp_robots_too_few_friendly(base_runner): - with pytest.raises(AssertionError): - base_runner._assert_exp_robots(0, 3) + with pytest.raises(ValueError): + base_runner._assert_exp_robots_and_ball(0, 3, True) def test_assert_exp_robots_too_many_enemy(base_runner): - with pytest.raises(AssertionError): - base_runner._assert_exp_robots(3, 999) + with pytest.raises(ValueError): + base_runner._assert_exp_robots_and_ball(3, 999, True) def test_assert_exp_goals_fails(base_runner):