Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 60 additions & 26 deletions src/domain/blocks_world_state.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,74 @@
import numpy as np
from typing import Generator, Set


class BlocksWorldState:
def __init__(self, current_state: Set[int], actions: dict[str, dict[str, Set[int]]], name: str = 'root', parent: BlocksWorldState | None = None) -> None:
if self.__is_valid_state(current_state) is False:
raise ValueError('The current state is not valid.')
def __init__(self, current_state: Set[int],
actions: dict[str, dict[str, Set[int]]],
name: str = 'root',
parent: 'BlocksWorldState | None' = None) -> None:

self.n_props = max(abs(x) for x in current_state) + 1
self.state_vec = np.zeros(self.n_props, dtype=np.bool_)
for fact in current_state:
if fact > 0:
self.state_vec[fact] = True

self.key = self.state_vec.tobytes()
self.actions_masks = self.__convert_actions(actions)

self.current = set(current_state)
self.key = str(sorted(self.current))
self.avaliable_actions = self.__filter_avaliable_actions(actions)
self.identifier = name
self.parent = parent

def successors(self, actions: dict[str, dict[str, Set[int]]]) -> Generator['BlocksWorldState', None, None]:
for name, action in self.avaliable_actions.items():
new_state = self.__expand(name, action, actions)
yield new_state
def successors(self, actions: dict[str, dict[str, Set[int]]]) \
-> Generator['BlocksWorldState', None, None]:

for name, masks in self.actions_masks.items():
pre_mask, add_mask, del_mask = masks
if np.all(self.state_vec[pre_mask]):
yield self.__expand(name, add_mask, del_mask, actions)

def __expand(self, action_name: str,
add_mask: np.ndarray,
del_mask: np.ndarray,
actions: dict[str, dict[str, Set[int]]]) -> 'BlocksWorldState':

new_vec = self.state_vec.copy()
new_vec[del_mask] = False
new_vec[add_mask] = True
new_state_set = {i for i in range(len(new_vec)) if new_vec[i]}

return BlocksWorldState(new_state_set, actions,
action_name, parent=self)

def __convert_actions(self, actions: dict[str, dict[str, Set[int]]]):

converted = {}

for name, cond in actions.items():

pre = cond['pre']
post = cond['post']

max_index = self.n_props

def __expand(self, action_name: str, action: dict[str, Set[int]], actions: dict[str, dict[str, Set[int]]]) -> BlocksWorldState:
transition_state = self.current - action['pre']
new_state = self.__resolve_consistent_state(
transition_state, action['post'])
pre_mask = np.zeros(max_index, dtype=np.bool_)
add_mask = np.zeros(max_index, dtype=np.bool_)
del_mask = np.zeros(max_index, dtype=np.bool_)

return BlocksWorldState(new_state, actions, action_name, parent=self)
for p in pre:
if p > 0:
pre_mask[p] = True

def __filter_avaliable_actions(self, actions: dict[str, dict[str, Set[int]]]) -> dict[str, dict[str, Set[int]]]:
return {
name: condition for name, condition in actions.items()
if condition['pre'].issubset(self.current)
}
for p in post:
if p > 0:
add_mask[p] = True
else:
del_mask[abs(p)] = True

def __resolve_consistent_state(self, transition_state: Set[int], post_state: Set[int]) -> Set[int]:
only_positive_facts = {fact for fact in post_state if fact > 0}
return transition_state.union(only_positive_facts)
converted[name] = (pre_mask, add_mask, del_mask)

def __is_valid_state(self, state: Set[int]) -> bool:
return len(state) == len({abs(fact) for fact in state})
return converted

def __hash__(self) -> int:
return hash(self.key)
Expand All @@ -44,4 +77,5 @@ def __eq__(self, other: object) -> bool:
return isinstance(other, BlocksWorldState) and self.key == other.key

def __repr__(self) -> str:
return f'State({self.current})'
true_indices = [i for i in range(len(self.state_vec)) if self.state_vec[i]]
return f"State({true_indices})"
94 changes: 49 additions & 45 deletions src/domain/planning.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Generator, Set
import time
import tracemalloc

from src.support.factories.algorithm_factory import AlgorithmFactory
from src.domain.contracts.local_search_algorithm import LocalSearchAlgorithm
from src.domain.contracts.planning_contract import PlanningContract
Expand All @@ -9,16 +10,17 @@


class Planning(PlanningContract):

def __init__(self, strips: StripsNotation):
self.__map = self.__map_clauses(strips)
self.__actions = self.__resolve_actions(strips.actions)

states = strips.states
self.__initial_state = self.__resolve_facts(states['initial'])
self.__goal_state = self.__resolve_facts(states['goal'])

self.__state_space = BlocksWorldState(
self.__initial_state, self.__actions)
self.__initial_state,
self.__actions
)

self.__planner: LocalSearchAlgorithm | None = None

Expand All @@ -37,80 +39,82 @@ def actions(self) -> dict[str, dict[str, Set[int]]]:
@property
def states(self) -> dict[str, Set[int]]:
return {
'initial': self.__initial_state,
'goal': self.__goal_state,
"initial": self.__initial_state,
"goal": self.__goal_state
}

def successors(self) -> Generator[BlocksWorldState, None, None]:
return self.__state_space.successors(self.__actions)

def solution(self, goal_state: BlocksWorldState) -> list[str]:
solution_path: list[str] = []

caminho: list[str] = []
while goal_state.parent is not None:
solution_path.insert(0, goal_state.identifier)
caminho.insert(0, goal_state.identifier)
goal_state = goal_state.parent
return solution_path
return caminho

def set_algoritm(self, algorithm_key: str) -> None:
self.__planner = AlgorithmFactory.make(algorithm_key, self)
def set_algoritm(self, key: str) -> None:
self.__planner = AlgorithmFactory.make(key, self)

def execute(self) -> None:
if self.__planner is None:
raise AssertionError('The algorithm is not set')
raise AssertionError("Algorithm not set")

tracemalloc.start()
start = time.perf_counter()
result, expansions, explorations = self.__planner.execute()
elapsed = time.perf_counter() - start
current, peak = tracemalloc.get_traced_memory()
inicio = time.perf_counter()

resultado, expandidos, explorados = self.__planner.execute()

tempo = time.perf_counter() - inicio
atual, pico = tracemalloc.get_traced_memory()
tracemalloc.stop()

self.__display_result(result, expansions, explorations, elapsed, current, peak)
self.__mostrar(resultado, expandidos, explorados, tempo, atual, pico)

def __display_result(self, result: list[str] | None, expansions: int, explorations: int, elapsed: float, current: int, peak: int) -> None:
algo_name = type(
self.__planner).__name__ if self.__planner is not None else None
def __mostrar(self, result, exp, explor, tempo, atual, pico):
nome_algo = type(self.__planner).__name__ if self.__planner else None

print("=" * 60)
print("Execution summary".center(60))
print("=" * 60)
print(f"Algorithm : {algo_name}")
print(f"Time elapsed : {elapsed:.6f} s")
print(f"Expanded nodes : {expansions}")
print(f"Explored nodes : {expansions}")
print(
f"Memory usage : current={current / 1024:.2f} KB; peak={peak / 1024:.2f} KB")
print(f"Algorithm : {nome_algo}")
print(f"Time elapsed : {tempo:.6f} s")
print(f"Expanded nodes : {exp}")
print(f"Explored nodes : {explor}")
print(f"Memory usage : current={atual/1024:.2f} KB; peak={pico/1024:.2f} KB")
print("-" * 60)

if result:
print(
f"Solution ({len(result)} step{'s' if len(result) != 1 else ''}):")
for i, step in enumerate(result, start=1):
print(f" {i:2d}. {step}")
print(f"Solution ({len(result)} steps):")
for i, step in enumerate(result, 1):
print(f"{i:2d}. {step}")
else:
print("No solution found")

print("=" * 60)

def __map_clauses(self, strips: StripsNotation) -> dict[str, int]:
action_hook: dict[str, int] = {}
tabela: dict[str, int] = {}
contador = 1

for fact in strips.avaliable_facts:
if fact not in action_hook.keys():
action_hook[fact] = abs(action_hook.get(f'~{fact}', len(
action_hook) + 1)) if fact.startswith('~') is False else -action_hook.get(
fact[1:], len(action_hook) + 1)
return action_hook
if fact not in tabela:
tabela[fact] = contador
complemento = "~" + fact
if complemento not in tabela:
tabela[complemento] = -contador
contador += 1

def __resolve_facts(self, target: list[str]) -> Set[int]:
return {self.__map[fact] for fact in target}
return tabela

def __resolve_actions(self, actions: dict[str, dict[str, list[str]]]) -> dict[str, dict[str, Set[int]]]:
return {
action_name: {
'pre': self.__resolve_facts(conditions['pre']),
'post': self.__resolve_facts(conditions['post']),
def __resolve_facts(self, target: list[str]) -> Set[int]:
return {self.__map[p] for p in target}

def __resolve_actions(self, actions: dict[str, dict[str, list[str]]]):
convertido = {}
for nome, cond in actions.items():
convertido[nome] = {
"pre": self.__resolve_facts(cond["pre"]),
"post": self.__resolve_facts(cond["post"]),
}
for action_name, conditions in actions.items()
}
return convertido