From 6d8c4d0bb0673ae1e6807a41a2d22c34e5fa2983 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20J=C3=BClg?= Date: Tue, 26 Aug 2025 12:24:53 +0200 Subject: [PATCH 1/4] feat: python command/path to start eval envs in --- src/agents/evaluator_envs.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/agents/evaluator_envs.py b/src/agents/evaluator_envs.py index be2b7c0..3e47c59 100644 --- a/src/agents/evaluator_envs.py +++ b/src/agents/evaluator_envs.py @@ -368,9 +368,11 @@ def run_eval_during_training( host="localhost", episodes: int = 100, n_processes: int | None = None, + cmd = None, ): - cmd = [ - "python", + if cmd is None: + cmd = ["python"] + cmd += [ "-m", "agents" "run-eval-during-training", agent_name, @@ -409,18 +411,16 @@ def run_eval_post_training( host="localhost", episodes: int = 100, n_processes: int | None = None, - video: bool = False, n_gpus: int = 1, + cmd = None, ): - if video: - run_recordings = os.path.join(output_path, "run_recordings") - os.mkdir(run_recordings) - for cfg in eval_cfgs: - cfg.env_kwargs["video_dir"] = run_recordings + if cmd is None: + cmd = ["python"] + slurm.sbatch( shlex.join( + cmd + [ - "python", "-m", "agents", "run-eval-post-training", From fabfa19ee757dbe487159090817cb464fde803cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20J=C3=BClg?= Date: Tue, 26 Aug 2025 12:25:08 +0200 Subject: [PATCH 2/4] refactor: added agents config --- src/agents/__main__.py | 48 +++++++++++++++++-------------- src/agents/evaluator_envs.py | 56 ++++++++++++++---------------------- 2 files changed, 47 insertions(+), 57 deletions(-) diff --git a/src/agents/__main__.py b/src/agents/__main__.py index 31b1337..9cc18df 100644 --- a/src/agents/__main__.py +++ b/src/agents/__main__.py @@ -15,7 +15,7 @@ # when started from jupyter notebook os.environ["MPLBACKEND"] = "Agg" -from agents.evaluator_envs import EvalConfig, evaluation, write_results +from agents.evaluator_envs import AgentConfig, EvalConfig, evaluation, write_results from agents.policies import AGENTS from agents.server import AgentService @@ -62,15 +62,16 @@ def start_server( def _per_process( - args: tuple[int, dict, list[EvalConfig], str, int, str, int, int | None, int], + args: tuple[int, AgentConfig, list[EvalConfig], int, int | None, int], ) -> tuple[np.ndarray, list[list[list[float]]], list[float], int]: - step, kwargs, eval_cfgs, agent_name, port, host, episodes, n_processes, nth_gpu = args + step, _agent_cfg, eval_cfgs, episodes, n_processes, nth_gpu = args logging.info(f"Starting evaluation for step {step}") os.environ["CUDA_VISIBLE_DEVICES"] = str(nth_gpu) - job_kwargs = copy.deepcopy(kwargs) - job_kwargs["checkpoint_step"] = step + agent_cfg = copy.deepcopy(_agent_cfg) + agent_cfg.agent_kwargs["checkpoint_step"] = step + per_env_results_last_reward, per_env_results_rewards = evaluation( - agent_name, job_kwargs, eval_cfgs, port, host, episodes, n_processes + agent_cfg=agent_cfg, eval_cfgs=eval_cfgs, episodes=episodes, n_processes=n_processes ) logging.info(f"Finished evaluation for step {step}") flatten_rewards = [[item for sublist in env_rewards for item in sublist] for env_rewards in per_env_results_rewards] @@ -81,7 +82,6 @@ def _per_process( @main_app.command() def run_eval_post_training( - agent_name: Annotated[str, typer.Argument(help="Agent name to run.")], wandb_project: Annotated[str, typer.Option(help="weights and biases logging project.")], wandb_entity: Annotated[str, typer.Option(help="weights and biases logging entity.")], wandb_note: Annotated[str, typer.Option(help="weights and biases logging note.")], @@ -89,15 +89,15 @@ def run_eval_post_training( output_path: Annotated[str, typer.Option(help="Path to store the run results.")], wandb_group: Annotated[str | None, typer.Option(help="weights and biases logging name.")] = None, steps: Annotated[str | None, typer.Option(help="steps to evaluate.")] = None, - kwargs: Annotated[str, typer.Option(help="args to start the agent.")] = "{}", - port: Annotated[int, typer.Option(help="Port to run the server on.")] = 8080, - host: Annotated[str, typer.Option(help="Host to run the server on.")] = "localhost", episodes: Annotated[int, typer.Option(help="Number of episodes to run.")] = 100, n_processes: Annotated[int | None, typer.Option(help="Number of processes to run.")] = None, n_gpus: Annotated[int, typer.Option(help="Number of gpus to run.")] = 1, eval_cfgs: Annotated[ str, typer.Option(help="Evaluation configurations.") ] = '[{"env": "rcs/SimplePickUpSim-v0", "kwargs": {}}]', + agent_cfg: Annotated[ + str, typer.Option(help="Agent configuration.") + ] = '{"host": "localhost", "port": 8080, "agent_name": "Test", "agent_kwargs": {}, "python_path": "python"}', ): """ post training eval which goes over all checkpoints @@ -115,7 +115,7 @@ def run_eval_post_training( entity=wandb_entity, resume="allow", project=wandb_project, - config=dict(agent_name=agent_name, agent_kwargs=json.loads(kwargs), eval_cfgs=json.loads(eval_cfgs)), + # config=dict(agent_name=agent_name, agent_kwargs=json.loads(kwargs), eval_cfgs=json.loads(eval_cfgs)), notes=wandb_note, job_type="eval", name=wandb_name, @@ -196,9 +196,13 @@ def run_eval_post_training( gpus_ids = [i % n_gpus for i in range(len(steps))] # spawn n processes and run in parallel + + agent_cfgs = [AgentConfig(**json.loads(agent_cfg)) for _ in range(steps)] + for idx in range(len(steps)): + agent_cfgs[idx].port += idx with Pool(n_processes) as p: args = [ - (step, kwargs, eval_cfgs, agent_name, port + idx, host, episodes, 1, gpus_ids[idx]) + (step, agent_cfgs[idx], eval_cfgs, episodes, 1, gpus_ids[idx]) for idx, step in enumerate(steps) ] results = p.map(_per_process, args) @@ -230,7 +234,7 @@ def run_eval_post_training( per_env_results_last_reward, per_env_results_rewards, eval_cfgs, - model_cfg={"agent_name": agent_name, "kwargs": kwargs}, + model_cfg={} #{"agent_name": agent_name, "kwargs": kwargs}, out=output_path, ) wandb.log_artifact(path, type="file", name="results", aliases=[f"step_{step}"]) @@ -238,7 +242,6 @@ def run_eval_post_training( @main_app.command() def run_eval_during_training( - agent_name: Annotated[str, typer.Argument(help="Agent name to run.")], wandb_id: Annotated[str, typer.Option(help="weights and biases logging id.")], wandb_group: Annotated[str, typer.Option(help="weights and biases logging group.")], wandb_project: Annotated[str, typer.Option(help="weights and biases logging project.")], @@ -247,20 +250,21 @@ def run_eval_during_training( wandb_name: Annotated[str, typer.Option(help="weights and biases logging name.")], output_path: Annotated[str, typer.Option(help="Path to store the run results.")], wandb_first: Annotated[bool, typer.Option(help="whether its the first eval.")] = False, - kwargs: Annotated[str, typer.Option(help="args to start the agent.")] = "{}", - port: Annotated[int, typer.Option(help="Port to run the server on.")] = 8080, - host: Annotated[str, typer.Option(help="Host to run the server on.")] = "localhost", episodes: Annotated[int, typer.Option(help="Number of episodes to run.")] = 100, n_processes: Annotated[int | None, typer.Option(help="Number of processes to run.")] = None, eval_cfgs: Annotated[ str, typer.Option(help="Evaluation configurations.") ] = '[{"env": "rcs/SimplePickUpSim-v0", "kwargs": {}}]', + agent_cfg: Annotated[ + str, typer.Option(help="Agent configuration.") + ] = '{"host": "localhost", "port": 8080, "agent_name": "Test", "agent_kwargs": {}, "python_path": "python"}', ): """ during training eval, all need to use the same id - just for one model, but many envs - can be new run but at least in the same project and same group as the training """ + assert agent_cfg["agent_name"] != "Test", "agent_cfg needs to be passed as a json argument. See the default for an example." if wandb_first: wandb.init( @@ -269,7 +273,7 @@ def run_eval_during_training( resume="allow", group=wandb_group, project=wandb_project, - config=dict(agent_name=agent_name, agent_kwargs=json.loads(kwargs), eval_cfgs=json.loads(eval_cfgs)), + # config=dict(agent_name=agent_name, agent_kwargs=json.loads(kwargs), eval_cfgs=json.loads(eval_cfgs)), notes=wandb_note, job_type="eval", name=wandb_name, @@ -281,13 +285,13 @@ def run_eval_during_training( wandb.init(id=wandb_id, entity=wandb_entity, resume="must", project=wandb_project) eval_cfgs = [EvalConfig(**cfg) for cfg in json.loads(eval_cfgs)] - kwargs = json.loads(kwargs) - step = kwargs.get("checkpoint_step", 0) + agent_cfg = AgentConfig(**json.loads(agent_cfg)) + step = agent_cfg.agent_kwargs.get("checkpoint_step", 0) step = step if step is not None else 0 per_env_results_last_reward, per_env_results_rewards = evaluation( - agent_name, kwargs, eval_cfgs, port, host, episodes, n_processes + agent_cfg=agent_cfg, eval_cfgs=eval_cfgs, episodes=episodes, n_processes=n_processes ) # return is [envs, episodes, 3(success, reward, steps)], [envs, episodes, rewards for all steps in the episode] @@ -387,7 +391,7 @@ def run_eval_during_training( per_env_results_last_reward, per_env_results_rewards, eval_cfgs, - model_cfg={"agent_name": agent_name, "kwargs": kwargs}, + model_cfg={}, #{"agent_name": agent_name, "kwargs": kwargs}, out=output_path, ) wandb.log_artifact(path, type="file", name="results", aliases=[f"step_{step}"]) diff --git a/src/agents/evaluator_envs.py b/src/agents/evaluator_envs.py index 3e47c59..12fd7fd 100644 --- a/src/agents/evaluator_envs.py +++ b/src/agents/evaluator_envs.py @@ -202,10 +202,13 @@ class EvalConfig: @dataclass -class ClientConfig: +class AgentConfig: host: str - port: int - model: str + agent_name: str + agent_kwargs: dict[str, Any] + python_path: str = "python" + """modify this if you want to use a specific python environment """ + port: int = 8080 def single_eval(env: EvaluatorEnv, agent: Agent, max_steps: int) -> tuple[list[float], list[float], list[float]]: @@ -237,23 +240,23 @@ def single_eval(env: EvaluatorEnv, agent: Agent, max_steps: int) -> tuple[list[f per_process_cache = {} -def create_env_agent(client_config: ClientConfig, cfg: EvalConfig, seed: int) -> tuple[EvaluatorEnv, RemoteAgent]: +def create_env_agent(agent_config: AgentConfig, cfg: EvalConfig, seed: int) -> tuple[EvaluatorEnv, RemoteAgent]: logging.info(f"retrieving env {cfg.env_id} and agent") if cfg.env_id not in per_process_cache: logging.info(f"env {cfg.env_id} not available, creating new env and agent") env = EvaluatorEnv.make(cfg.env_id, seed=seed, **cfg.env_kwargs) logging.info("done creating env") - agent = RemoteAgent(client_config.host, client_config.port, client_config.model) + agent = RemoteAgent(agent_config.host, agent_config.port, agent_config.model) logging.info("done creating agent") per_process_cache[cfg.env_id] = (env, agent) return per_process_cache[cfg.env_id] -def per_process(args: tuple[int, list[EvalConfig], int, ClientConfig]) -> tuple[float, float, float]: +def per_process(args: tuple[int, list[EvalConfig], int, AgentConfig]) -> tuple[float, float, float]: logging.info(f"Starting process {args}") - i, cfgs, episodes, client_cfg = args + i, cfgs, episodes, agent_cfg = args cfg = cfgs[i // episodes] - env, agent = create_env_agent(client_cfg, cfg, seed=i) + env, agent = create_env_agent(agent_cfg, cfg, seed=i) # busy wait for server to finish initialization while not agent.is_initialized(): logging.info("Waiting for agent to initialize...") @@ -262,7 +265,7 @@ def per_process(args: tuple[int, list[EvalConfig], int, ClientConfig]) -> tuple[ def multi_eval( - client_cfg: ClientConfig, cfgs: list[EvalConfig], episodes: int = 100, n_processes: int = 1 + agent_cfg: AgentConfig, cfgs: list[EvalConfig], episodes: int = 100, n_processes: int = 1 ) -> tuple[np.ndarray, list[list[list[float]]]]: # return is [envs, episodes, 3(success, reward, steps)], [envs, episodes, rewards for all steps in the episode] logging.info(f"Starting evaluation with {len(cfgs)} environments and {episodes} episodes each") @@ -273,7 +276,7 @@ def multi_eval( # single_results = p.map(per_process, args) # without process - args = [(i, cfgs, episodes, client_cfg) for i in range(len(cfgs) * episodes)] + args = [(i, cfgs, episodes, agent_cfg) for i in range(len(cfgs) * episodes)] single_results = [per_process(arg) for arg in tqdm(args)] single_results_last_reward = np.array([(i[0], i[1][-1], i[2]) for i in single_results]) @@ -324,19 +327,14 @@ def start_server( def evaluation( - agent_name: str, - kwargs: dict[str, Any], + agent_cfg: AgentConfig, eval_cfgs: list[EvalConfig], - port: int = 8080, - host: str = "localhost", episodes: int = 100, n_processes: int = 1, - python_path: str = "python", ): - logging.info(f"Starting evaluation with {agent_name} and {kwargs}") - with start_server(agent_name, kwargs, port, host, python_path) as p: - client_cfg = ClientConfig(host, port, agent_name) - res = multi_eval(client_cfg, eval_cfgs, episodes, n_processes) + logging.info(f"Starting evaluation with {agent_cfg.agent_name} and {agent_cfg.agent_kwargs}") + with start_server(agent_cfg.agent_name, agent_cfg.agent_kwargs, agent_cfg.port, agent_cfg.host, agent_cfg.python_path) as p: + res = multi_eval(agent_cfg, eval_cfgs, episodes, n_processes) logging.info("Evaluation finished") # send ctrl c signal p.send_signal(subprocess.signal.SIGINT) @@ -352,8 +350,7 @@ def evaluation( def run_eval_during_training( - agent_name: str, - kwargs: dict[str, Any], + agent_cfg: AgentConfig, eval_cfgs: list[EvalConfig], wandb_id: str, wandb_entity: str, @@ -364,8 +361,6 @@ def run_eval_during_training( slurm: Slurm, output_path: str, wandb_first: bool = False, - port=8080, - host="localhost", episodes: int = 100, n_processes: int | None = None, cmd = None, @@ -375,10 +370,7 @@ def run_eval_during_training( cmd += [ "-m", "agents" "run-eval-during-training", - agent_name, - f"--kwargs={json.dumps(kwargs)}", - f"--port={port}", - f"--host={host}", + f"--agent-cfg={json.dumps(asdict(agent_cfg))}" f"--episodes={episodes}", f"--n-processes={n_processes}", f"--eval-cfgs={json.dumps([asdict(cfg) for cfg in eval_cfgs])}", @@ -396,8 +388,7 @@ def run_eval_during_training( def run_eval_post_training( - agent_name: str, - kwargs: dict[str, Any], + agent_cfg: AgentConfig, eval_cfgs: list[EvalConfig], wandb_entity: str, wandb_project: str, @@ -407,8 +398,6 @@ def run_eval_post_training( slurm: Slurm, output_path: str, wandb_group: str | None = None, - port=8080, - host="localhost", episodes: int = 100, n_processes: int | None = None, n_gpus: int = 1, @@ -424,10 +413,7 @@ def run_eval_post_training( "-m", "agents", "run-eval-post-training", - agent_name, - f"--kwargs={json.dumps(kwargs)}", - f"--port={port}", - f"--host={host}", + f"--agent-cfg={json.dumps(asdict(agent_cfg))}" f"--episodes={episodes}", f"--n-processes={n_processes}", f"--eval-cfgs={json.dumps([asdict(cfg) for cfg in eval_cfgs])}", From 170b5838ef639a9f7937b4256cec424653040ee4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20J=C3=BClg?= Date: Tue, 26 Aug 2025 12:42:28 +0200 Subject: [PATCH 3/4] fix: agent config in write results function --- src/agents/__main__.py | 4 ++-- src/agents/evaluator_envs.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/agents/__main__.py b/src/agents/__main__.py index 9cc18df..a8c1c86 100644 --- a/src/agents/__main__.py +++ b/src/agents/__main__.py @@ -234,7 +234,7 @@ def run_eval_post_training( per_env_results_last_reward, per_env_results_rewards, eval_cfgs, - model_cfg={} #{"agent_name": agent_name, "kwargs": kwargs}, + agent_cfg=agent_cfg, out=output_path, ) wandb.log_artifact(path, type="file", name="results", aliases=[f"step_{step}"]) @@ -391,7 +391,7 @@ def run_eval_during_training( per_env_results_last_reward, per_env_results_rewards, eval_cfgs, - model_cfg={}, #{"agent_name": agent_name, "kwargs": kwargs}, + agent_cfg=agent_cfg, out=output_path, ) wandb.log_artifact(path, type="file", name="results", aliases=[f"step_{step}"]) diff --git a/src/agents/evaluator_envs.py b/src/agents/evaluator_envs.py index 12fd7fd..2815b9f 100644 --- a/src/agents/evaluator_envs.py +++ b/src/agents/evaluator_envs.py @@ -434,7 +434,7 @@ def write_results( results: np.ndarray, rewards: list[list[list[float]]], eval_cfgs: list[EvalConfig], - model_cfg: dict[str, Any], + agent_cfg: AgentConfig, out: str = "", ) -> str: # first read json, if not exists write empty list @@ -488,7 +488,7 @@ def write_results( "episodes": len(results), "timestamp": datetime.datetime.now().isoformat(), "env_cfg": asdict(cfg), - "model_cfg": model_cfg, + "agent_cfg": asdict(agent_cfg), } ) From a2afbe8e3bb59e26ee826e825a99b479eaa32154 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20J=C3=BClg?= Date: Tue, 26 Aug 2025 12:44:00 +0200 Subject: [PATCH 4/4] style: format --- src/agents/__main__.py | 9 ++++----- src/agents/evaluator_envs.py | 18 +++++++++--------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/agents/__main__.py b/src/agents/__main__.py index a8c1c86..392171e 100644 --- a/src/agents/__main__.py +++ b/src/agents/__main__.py @@ -201,10 +201,7 @@ def run_eval_post_training( for idx in range(len(steps)): agent_cfgs[idx].port += idx with Pool(n_processes) as p: - args = [ - (step, agent_cfgs[idx], eval_cfgs, episodes, 1, gpus_ids[idx]) - for idx, step in enumerate(steps) - ] + args = [(step, agent_cfgs[idx], eval_cfgs, episodes, 1, gpus_ids[idx]) for idx, step in enumerate(steps)] results = p.map(_per_process, args) logging.info("Finished evaluation") @@ -264,7 +261,9 @@ def run_eval_during_training( - just for one model, but many envs - can be new run but at least in the same project and same group as the training """ - assert agent_cfg["agent_name"] != "Test", "agent_cfg needs to be passed as a json argument. See the default for an example." + assert ( + agent_cfg["agent_name"] != "Test" + ), "agent_cfg needs to be passed as a json argument. See the default for an example." if wandb_first: wandb.init( diff --git a/src/agents/evaluator_envs.py b/src/agents/evaluator_envs.py index 2815b9f..9aa1c87 100644 --- a/src/agents/evaluator_envs.py +++ b/src/agents/evaluator_envs.py @@ -333,7 +333,9 @@ def evaluation( n_processes: int = 1, ): logging.info(f"Starting evaluation with {agent_cfg.agent_name} and {agent_cfg.agent_kwargs}") - with start_server(agent_cfg.agent_name, agent_cfg.agent_kwargs, agent_cfg.port, agent_cfg.host, agent_cfg.python_path) as p: + with start_server( + agent_cfg.agent_name, agent_cfg.agent_kwargs, agent_cfg.port, agent_cfg.host, agent_cfg.python_path + ) as p: res = multi_eval(agent_cfg, eval_cfgs, episodes, n_processes) logging.info("Evaluation finished") # send ctrl c signal @@ -363,15 +365,14 @@ def run_eval_during_training( wandb_first: bool = False, episodes: int = 100, n_processes: int | None = None, - cmd = None, + cmd=None, ): if cmd is None: cmd = ["python"] cmd += [ "-m", "agents" "run-eval-during-training", - f"--agent-cfg={json.dumps(asdict(agent_cfg))}" - f"--episodes={episodes}", + f"--agent-cfg={json.dumps(asdict(agent_cfg))}" f"--episodes={episodes}", f"--n-processes={n_processes}", f"--eval-cfgs={json.dumps([asdict(cfg) for cfg in eval_cfgs])}", f"--wandb-id={wandb_id}", @@ -401,20 +402,19 @@ def run_eval_post_training( episodes: int = 100, n_processes: int | None = None, n_gpus: int = 1, - cmd = None, + cmd=None, ): if cmd is None: cmd = ["python"] slurm.sbatch( shlex.join( - cmd + - [ + cmd + + [ "-m", "agents", "run-eval-post-training", - f"--agent-cfg={json.dumps(asdict(agent_cfg))}" - f"--episodes={episodes}", + f"--agent-cfg={json.dumps(asdict(agent_cfg))}" f"--episodes={episodes}", f"--n-processes={n_processes}", f"--eval-cfgs={json.dumps([asdict(cfg) for cfg in eval_cfgs])}", f"--wandb-group={wandb_group.replace(':', '_') if wandb_group else ''}",