Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 62 additions & 15 deletions modules/create_environments/convert.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
# function for converting Flatland environment into a string of clingo-readable ASP facts

# def flip_y(height, y) -> int:
# """
# flips the value of the y along the axis for a given environment
# """
# return(height - y - 1)
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_env import RailEnvActions
from flatland.utils.rendertools import RenderTool, AgentRenderVariant


def convert_to_clingo(env) -> str:
Expand All @@ -15,6 +11,7 @@ def convert_to_clingo(env) -> str:
rail_map = env.rail.grid
height, width, agents = env.height, env.width, env.agents
clingo_str = f"% clingo representation of a Flatland environment\n% height: {height}, width: {width}, agents: {len(agents)}\n"
clingo_str += f"\nglobal({env._max_episode_steps}).\n"

# save start and end positions for each agent
dir_map = {0:"n", 1:"e", 2:"s", 3:"w"}
Expand All @@ -23,15 +20,15 @@ def convert_to_clingo(env) -> str:
init_y, init_x = agent_info.initial_position
goal_y, goal_x = agent_info.target
min_start, max_end = agent_info.earliest_departure, agent_info.latest_arrival

# flip the y-axis
# init_y = flip_y(height, init_y)
# goal_y = flip_y(height, goal_y)
speed = int(1/agent_info.speed_counter.speed) # inverse, e.g. 1/2 --> 2, 1/4 --> 4 etc.

direction = dir_map[agent_info.initial_direction]
clingo_str += f"\nagent({agent_num}). "
clingo_str += f"start(agent({agent_num}),({init_y},{init_x}),{min_start},dir({direction})). "
clingo_str += f"end(agent({agent_num}),({goal_y},{goal_x}),{max_end}).\n"
clingo_str += f"\ntrain({agent_num}). "
clingo_str += f"start({agent_num},({init_y},{init_x}),{min_start},{direction}). "
clingo_str += f"end({agent_num},({goal_y},{goal_x}),{max_end}). "
clingo_str += f"speed({agent_num},{speed}).\n"

clingo_str += "\n"

# create an atom for each cell in the environment
#row_num = len(rail_map) - 1
Expand All @@ -41,4 +38,54 @@ def convert_to_clingo(env) -> str:
#row_num -= 1
clingo_str+="\n"

return(clingo_str)
return(clingo_str)

def convert_formers_to_clingo(actions) -> str:
# change back to the clingo names
mapping = {RailEnvActions.MOVE_FORWARD:"move_forward", RailEnvActions.MOVE_RIGHT:"move_right", RailEnvActions.MOVE_LEFT:"move_left", RailEnvActions.STOP_MOVING:"wait"}
for index, dict in enumerate(actions):
for key in dict.keys():
actions[index][key] = mapping[actions[index][key]]

facts = []
# change from dictionary into facts
for index, dict in enumerate(actions):
for key in dict.keys():
facts.append(f':- not action(train({key}),{actions[index][key]},{index}).\n') #remove: can this be a list of strings or should it be one long string?

return(facts)


def convert_malfunctions_to_clingo(malfs, timestep) -> str:
#mapping = {RailEnvActions.MOVE_FORWARD:"move_forward", RailEnvActions.MOVE_RIGHT:"move_right", RailEnvActions.MOVE_LEFT:"move_left", RailEnvActions.STOP_MOVING:"wait"}
facts = []
for m in malfs:
train, duration = m[0], m[1]
facts.append(f'malfunction({train},{duration},{timestep}).\n')
for t in range(timestep+1, timestep+1+m[1]): # remove: make sure this duration should be included (aka remove +1 or keep it?)
facts.append(f':- not action(train({train}),wait,{t}).\n') #remove: can this be a list of strings or should it be one long string?

return(facts)


def convert_futures_to_clingo(actions) -> str:
# change back to the clingo names
mapping = {RailEnvActions.MOVE_FORWARD:"move_forward", RailEnvActions.MOVE_RIGHT:"move_right", RailEnvActions.MOVE_LEFT:"move_left", RailEnvActions.STOP_MOVING:"wait"}
for index, dict in enumerate(actions):
for key in dict.keys():
actions[index][key] = mapping[actions[index][key]]

facts = []
# change from dictionary into facts
for index, dict in enumerate(actions):
for key in dict.keys():
facts.append(f'planned_action(train({key}),{actions[index][key]},{index}).\n') #remove: can this be a list of strings or should it be one long string?

return(facts)

def convert_actions_to_flatland(actions) -> list:
mapping = {"move_forward":RailEnvActions.MOVE_FORWARD, "move_right":RailEnvActions.MOVE_RIGHT, "move_left":RailEnvActions.MOVE_LEFT, "wait":RailEnvActions.STOP_MOVING}
for index, dict in enumerate(actions):
for key in dict.keys():
actions[index][key] = mapping[actions[index][key]]
return(actions)