From 583fff84139f33ad8505438482759c726b4aae46 Mon Sep 17 00:00:00 2001 From: Digoox99 Date: Sun, 7 Dec 2025 21:14:00 -0300 Subject: [PATCH] =?UTF-8?q?otimiza=C3=A7=C3=A3o=20numpy=20na=20blocks=5Fwo?= =?UTF-8?q?rld=5Fstate=20e=20planning?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/domain/blocks_world_state.py | 86 ++++++++++++++++++++--------- src/domain/planning.py | 94 +++++++++++++++++--------------- 2 files changed, 109 insertions(+), 71 deletions(-) diff --git a/src/domain/blocks_world_state.py b/src/domain/blocks_world_state.py index 038c4c1..4b53f31 100644 --- a/src/domain/blocks_world_state.py +++ b/src/domain/blocks_world_state.py @@ -1,41 +1,74 @@ +import numpy as np from typing import Generator, Set class BlocksWorldState: - def __init__(self, current_state: Set[int], actions: dict[str, dict[str, Set[int]]], name: str = 'root', parent: BlocksWorldState | None = None) -> None: - if self.__is_valid_state(current_state) is False: - raise ValueError('The current state is not valid.') + def __init__(self, current_state: Set[int], + actions: dict[str, dict[str, Set[int]]], + name: str = 'root', + parent: 'BlocksWorldState | None' = None) -> None: + + self.n_props = max(abs(x) for x in current_state) + 1 + self.state_vec = np.zeros(self.n_props, dtype=np.bool_) + for fact in current_state: + if fact > 0: + self.state_vec[fact] = True + + self.key = self.state_vec.tobytes() + self.actions_masks = self.__convert_actions(actions) - self.current = set(current_state) - self.key = str(sorted(self.current)) - self.avaliable_actions = self.__filter_avaliable_actions(actions) self.identifier = name self.parent = parent - def successors(self, actions: dict[str, dict[str, Set[int]]]) -> Generator['BlocksWorldState', None, None]: - for name, action in self.avaliable_actions.items(): - new_state = self.__expand(name, action, actions) - yield new_state + def successors(self, actions: dict[str, dict[str, Set[int]]]) \ + -> Generator['BlocksWorldState', None, None]: + + for name, masks in self.actions_masks.items(): + pre_mask, add_mask, del_mask = masks + if np.all(self.state_vec[pre_mask]): + yield self.__expand(name, add_mask, del_mask, actions) + + def __expand(self, action_name: str, + add_mask: np.ndarray, + del_mask: np.ndarray, + actions: dict[str, dict[str, Set[int]]]) -> 'BlocksWorldState': + + new_vec = self.state_vec.copy() + new_vec[del_mask] = False + new_vec[add_mask] = True + new_state_set = {i for i in range(len(new_vec)) if new_vec[i]} + + return BlocksWorldState(new_state_set, actions, + action_name, parent=self) + + def __convert_actions(self, actions: dict[str, dict[str, Set[int]]]): + + converted = {} + + for name, cond in actions.items(): + + pre = cond['pre'] + post = cond['post'] + + max_index = self.n_props - def __expand(self, action_name: str, action: dict[str, Set[int]], actions: dict[str, dict[str, Set[int]]]) -> BlocksWorldState: - transition_state = self.current - action['pre'] - new_state = self.__resolve_consistent_state( - transition_state, action['post']) + pre_mask = np.zeros(max_index, dtype=np.bool_) + add_mask = np.zeros(max_index, dtype=np.bool_) + del_mask = np.zeros(max_index, dtype=np.bool_) - return BlocksWorldState(new_state, actions, action_name, parent=self) + for p in pre: + if p > 0: + pre_mask[p] = True - def __filter_avaliable_actions(self, actions: dict[str, dict[str, Set[int]]]) -> dict[str, dict[str, Set[int]]]: - return { - name: condition for name, condition in actions.items() - if condition['pre'].issubset(self.current) - } + for p in post: + if p > 0: + add_mask[p] = True + else: + del_mask[abs(p)] = True - def __resolve_consistent_state(self, transition_state: Set[int], post_state: Set[int]) -> Set[int]: - only_positive_facts = {fact for fact in post_state if fact > 0} - return transition_state.union(only_positive_facts) + converted[name] = (pre_mask, add_mask, del_mask) - def __is_valid_state(self, state: Set[int]) -> bool: - return len(state) == len({abs(fact) for fact in state}) + return converted def __hash__(self) -> int: return hash(self.key) @@ -44,4 +77,5 @@ def __eq__(self, other: object) -> bool: return isinstance(other, BlocksWorldState) and self.key == other.key def __repr__(self) -> str: - return f'State({self.current})' + true_indices = [i for i in range(len(self.state_vec)) if self.state_vec[i]] + return f"State({true_indices})" diff --git a/src/domain/planning.py b/src/domain/planning.py index 008f308..f1f3100 100644 --- a/src/domain/planning.py +++ b/src/domain/planning.py @@ -1,6 +1,7 @@ from typing import Generator, Set import time import tracemalloc + from src.support.factories.algorithm_factory import AlgorithmFactory from src.domain.contracts.local_search_algorithm import LocalSearchAlgorithm from src.domain.contracts.planning_contract import PlanningContract @@ -9,16 +10,17 @@ class Planning(PlanningContract): + def __init__(self, strips: StripsNotation): self.__map = self.__map_clauses(strips) self.__actions = self.__resolve_actions(strips.actions) - states = strips.states self.__initial_state = self.__resolve_facts(states['initial']) self.__goal_state = self.__resolve_facts(states['goal']) - self.__state_space = BlocksWorldState( - self.__initial_state, self.__actions) + self.__initial_state, + self.__actions + ) self.__planner: LocalSearchAlgorithm | None = None @@ -37,80 +39,82 @@ def actions(self) -> dict[str, dict[str, Set[int]]]: @property def states(self) -> dict[str, Set[int]]: return { - 'initial': self.__initial_state, - 'goal': self.__goal_state, + "initial": self.__initial_state, + "goal": self.__goal_state } def successors(self) -> Generator[BlocksWorldState, None, None]: return self.__state_space.successors(self.__actions) def solution(self, goal_state: BlocksWorldState) -> list[str]: - solution_path: list[str] = [] - + caminho: list[str] = [] while goal_state.parent is not None: - solution_path.insert(0, goal_state.identifier) + caminho.insert(0, goal_state.identifier) goal_state = goal_state.parent - return solution_path + return caminho - def set_algoritm(self, algorithm_key: str) -> None: - self.__planner = AlgorithmFactory.make(algorithm_key, self) + def set_algoritm(self, key: str) -> None: + self.__planner = AlgorithmFactory.make(key, self) def execute(self) -> None: if self.__planner is None: - raise AssertionError('The algorithm is not set') + raise AssertionError("Algorithm not set") tracemalloc.start() - start = time.perf_counter() - result, expansions, explorations = self.__planner.execute() - elapsed = time.perf_counter() - start - current, peak = tracemalloc.get_traced_memory() + inicio = time.perf_counter() + + resultado, expandidos, explorados = self.__planner.execute() + + tempo = time.perf_counter() - inicio + atual, pico = tracemalloc.get_traced_memory() tracemalloc.stop() - self.__display_result(result, expansions, explorations, elapsed, current, peak) + self.__mostrar(resultado, expandidos, explorados, tempo, atual, pico) - def __display_result(self, result: list[str] | None, expansions: int, explorations: int, elapsed: float, current: int, peak: int) -> None: - algo_name = type( - self.__planner).__name__ if self.__planner is not None else None + def __mostrar(self, result, exp, explor, tempo, atual, pico): + nome_algo = type(self.__planner).__name__ if self.__planner else None print("=" * 60) print("Execution summary".center(60)) print("=" * 60) - print(f"Algorithm : {algo_name}") - print(f"Time elapsed : {elapsed:.6f} s") - print(f"Expanded nodes : {expansions}") - print(f"Explored nodes : {expansions}") - print( - f"Memory usage : current={current / 1024:.2f} KB; peak={peak / 1024:.2f} KB") + print(f"Algorithm : {nome_algo}") + print(f"Time elapsed : {tempo:.6f} s") + print(f"Expanded nodes : {exp}") + print(f"Explored nodes : {explor}") + print(f"Memory usage : current={atual/1024:.2f} KB; peak={pico/1024:.2f} KB") print("-" * 60) if result: - print( - f"Solution ({len(result)} step{'s' if len(result) != 1 else ''}):") - for i, step in enumerate(result, start=1): - print(f" {i:2d}. {step}") + print(f"Solution ({len(result)} steps):") + for i, step in enumerate(result, 1): + print(f"{i:2d}. {step}") else: print("No solution found") print("=" * 60) def __map_clauses(self, strips: StripsNotation) -> dict[str, int]: - action_hook: dict[str, int] = {} + tabela: dict[str, int] = {} + contador = 1 for fact in strips.avaliable_facts: - if fact not in action_hook.keys(): - action_hook[fact] = abs(action_hook.get(f'~{fact}', len( - action_hook) + 1)) if fact.startswith('~') is False else -action_hook.get( - fact[1:], len(action_hook) + 1) - return action_hook + if fact not in tabela: + tabela[fact] = contador + complemento = "~" + fact + if complemento not in tabela: + tabela[complemento] = -contador + contador += 1 - def __resolve_facts(self, target: list[str]) -> Set[int]: - return {self.__map[fact] for fact in target} + return tabela - def __resolve_actions(self, actions: dict[str, dict[str, list[str]]]) -> dict[str, dict[str, Set[int]]]: - return { - action_name: { - 'pre': self.__resolve_facts(conditions['pre']), - 'post': self.__resolve_facts(conditions['post']), + def __resolve_facts(self, target: list[str]) -> Set[int]: + return {self.__map[p] for p in target} + + def __resolve_actions(self, actions: dict[str, dict[str, list[str]]]): + convertido = {} + for nome, cond in actions.items(): + convertido[nome] = { + "pre": self.__resolve_facts(cond["pre"]), + "post": self.__resolve_facts(cond["post"]), } - for action_name, conditions in actions.items() - } + return convertido