Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.md text working-tree-encoding=UTF-8
*.rst text working-tree-encoding=UTF-8
20 changes: 10 additions & 10 deletions docs/source/model_steps.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ Source : https://github.com/mobility-team/mobility/issues/145#issuecomment-32280
Le fonctionnement actuel est le suivant :

Initialisation :
- Génération des séquences de motifs de déplacement dans chaque zone de transport, selon le profil de la population résidente (CSP, nombre de voitures du ménage, type de catégorie urbaine de la commune), et des besoins en heures d'activité pour chaque étape des séquences.
- Calcul des opportunités disponibles (=heures d'activités disponibles) par motif, pour chaque zone de transport.
- Génération des séquences de motifs de déplacement dans chaque zone de transport, selon le profil de la population résidente (CSP, nombre de voitures du ménage, type de catégorie urbaine de la commune), et des besoins en heures d'activité pour chaque étape des séquences.
- Calcul des opportunités disponibles (=heures d'activités disponibles) par motif, pour chaque zone de transport.

Boucle :
- Calcul des coûts généralisés de transport pour chaque couple motif - origine - destination (sans congestion pour la première itération).
- Calcul des probabilités de choisir une destination en fonction du motif et de l'origine du déplacement ainsi que du lieu de résidence des personnes.
- Echantillonnage d'une séquence de destinations pour chaque séquence de motifs, zone de transport de résidence et CSP.
- Recherche des top k séquences de modes disponibles pour réaliser ces séquences de déplacements (k<=10)
- Calcul des flux résultants par OD et par mode, puis recalcul des coûts généralisés.
- Calcul d'une part de personnes qui vont changer d'assignation séquence de motifs + modes (en fonction de la saturation des opportunités à destination, de possibilités d'optimisation comparatives, et d'une part de changements aléatoires).
- Calcul des opportunités restantes à destination.
- Recommencement de la procédure avec cette part de personnes non assignées.
- Calcul des coûts généralisés de transport pour chaque couple motif - origine - destination (sans congestion pour la première itération).
- Calcul des probabilités de choisir une destination en fonction du motif et de l'origine du déplacement ainsi que du lieu de résidence des personnes.
- Echantillonnage d'une séquence de destinations pour chaque séquence de motifs, zone de transport de résidence et CSP.
- Recherche des top k séquences de modes disponibles pour réaliser ces séquences de déplacements (k<=10)
- Calcul des flux résultants par OD et par mode, puis recalcul des coûts généralisés.
- Calcul d'une part de personnes qui vont changer d'assignation séquence de motifs + modes (en fonction de la saturation des opportunités à destination, de possibilités d'optimisation comparatives, et d'une part de changements aléatoires).
- Calcul des opportunités restantes à destination.
- Recommencement de la procédure avec cette part de personnes non assignées.
3 changes: 2 additions & 1 deletion mobility/choice_models/population_trips.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,8 @@ def run_model(self, is_weekday):
iteration,
parameters.n_iter_per_cost_update,
current_states_steps,
costs_aggregator
costs_aggregator,
run_key=self.inputs_hash
)

remaining_sinks = self.state_updater.get_new_sinks(
Expand Down
12 changes: 8 additions & 4 deletions mobility/choice_models/state_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ def get_current_states_steps(self, current_states, possible_states_steps):



def get_new_costs(self, costs, iteration, n_iter_per_cost_update, current_states_steps, costs_aggregator):
def get_new_costs(self, costs, iteration, n_iter_per_cost_update, current_states_steps, costs_aggregator, run_key=None):
"""Optionally recompute congested costs from current flows.

Aggregates OD flows by mode, updates network/user-equilibrium in the
Expand Down Expand Up @@ -496,8 +496,12 @@ def get_new_costs(self, costs, iteration, n_iter_per_cost_update, current_states
flow_volume=pl.col("n_persons").sum()
)
)

costs_aggregator.update(od_flows_by_mode)

has_congestion = any(getattr(m, "congestion", False) for m in costs_aggregator.modes)

# Only build/update congestion snapshots when at least one mode handles congestion.
if has_congestion:
costs_aggregator.update(od_flows_by_mode, run_key=run_key, iteration=iteration)
costs = costs_aggregator.get(congestion=True)

return costs
Expand Down Expand Up @@ -572,4 +576,4 @@ def get_new_sinks(

)

return remaining_sinks
return remaining_sinks
19 changes: 16 additions & 3 deletions mobility/choice_models/travel_costs_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def get_prob_by_od_and_mode(self, metrics: List, congestion: bool):
return prob


def update(self, od_flows_by_mode):
def update(self, od_flows_by_mode, run_key=None, iteration=None):

logging.info("Updating travel costs given OD flows...")

Expand Down Expand Up @@ -218,6 +218,19 @@ def update(self, od_flows_by_mode):

raise ValueError("No flow volume to vehicle volume model for mode : " + mode.name)

mode.travel_costs.update(flows)
flow_asset = None
if run_key is not None and iteration is not None:
# Persist vehicle flows as a first-class asset so downstream congestion
# snapshots are isolated per run/iteration and safe for parallel runs.
from mobility.transport_costs.od_flows_asset import VehicleODFlowsAsset
flow_asset = VehicleODFlowsAsset(
flows.to_pandas(),
run_key=str(run_key),
iteration=int(iteration),
mode_name=str(mode.name)
)
flow_asset.get()

mode.travel_costs.update(flows, flow_asset=flow_asset)



6 changes: 6 additions & 0 deletions mobility/experiments/hash_stability/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM python:3.12-slim
ARG POLARS_VERSION=1.37.1
WORKDIR /app
RUN pip install --no-cache-dir polars==${POLARS_VERSION}
COPY hash_stability.py /app/hash_stability.py
CMD ["python", "hash_stability.py"]
40 changes: 40 additions & 0 deletions mobility/experiments/hash_stability/hash_stability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import json, platform, sys
import polars as pl

SEED = 12345

# Replace this with your real df if you want:
df = pl.DataFrame(
{
"demand_group_id": pl.Series([13, 25, 16, 16, 9, 2029, 2028, 2032, 2030, 2029], dtype=pl.UInt32),
"home_zone_id": pl.Series([1, 1, 1, 1, 1, 77, 77, 77, 77, 77], dtype=pl.Int32),
"motive_seq_id": pl.Series([241, 241, 215, 228, 143, 237, 235, 227, 215, 241], dtype=pl.UInt32),
"motive": pl.Series(
["work"] * 10,
dtype=pl.Enum(["home", "other", "studies", "work"]),
),
"to": pl.Series([76, 76, 76, 76, 76, 63, 63, 63, 63, 63], dtype=pl.Int32),
"p_ij": pl.Series([0.185129]*5 + [0.010314]*5, dtype=pl.Float64),
}
)

hashes = (
df.select(
pl.struct(["demand_group_id", "motive_seq_id", "motive", "to"])
.hash(seed=SEED)
.alias("h")
)["h"]
.to_list()
)

payload = {
"polars_version": pl.__version__,
"python_version": sys.version.split()[0],
"machine": platform.machine(),
"platform": platform.platform(),
"seed": SEED,
"hashes": hashes,
}

print("CURRENT:")
print(json.dumps(payload, indent=2))
42 changes: 42 additions & 0 deletions mobility/transport_costs/od_flows_asset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
import pathlib
import pandas as pd

from mobility.file_asset import FileAsset


class VehicleODFlowsAsset(FileAsset):
"""Persist vehicle OD flows for congestion as a first-class FileAsset.

This intentionally stores only what the congestion builder needs:
["from","to","vehicle_volume"].

The cache key is (run_key, iteration, mode_name), where run_key should be
PopulationTrips.inputs_hash (includes the seed).
"""

def __init__(self, vehicle_od_flows: pd.DataFrame, *, run_key: str, iteration: int, mode_name: str):
inputs = {
"run_key": str(run_key),
"iteration": int(iteration),
"mode_name": str(mode_name),
"schema_version": 1
}
folder_path = pathlib.Path(os.environ["MOBILITY_PROJECT_DATA_FOLDER"])
cache_path = folder_path / "od_flows" / f"vehicle_od_flows_{mode_name}.parquet"

self._vehicle_od_flows = vehicle_od_flows
super().__init__(inputs, cache_path)

def get_cached_asset(self) -> pd.DataFrame:
return pd.read_parquet(self.cache_path)

def create_and_get_asset(self) -> pd.DataFrame:
self.cache_path.parent.mkdir(parents=True, exist_ok=True)

# Ensure the file always exists and has the expected schema, even if empty.
df = self._vehicle_od_flows
expected_cols = ["from", "to", "vehicle_volume"]
df = df[expected_cols] if all(c in df.columns for c in expected_cols) else df
df.to_parquet(self.cache_path, index=False)
return df
58 changes: 51 additions & 7 deletions mobility/transport_costs/path_travel_costs.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(
osm_capacity_parameters: OSMCapacityParameters,
congestion: bool = False,
congestion_flows_scaling_factor: float = 1.0,
speed_modifiers: List[SpeedModifier] = []
speed_modifiers: List[SpeedModifier] = [],
):
"""
Initializes a TravelCosts object with the given transport zones and travel mode.
Expand Down Expand Up @@ -81,6 +81,10 @@ def __init__(

super().__init__(inputs, cache_path)

# When congestion updates are used, we keep a pointer to the latest
# per-iteration snapshot so `get(congestion=True)` is isolated per run.
self._current_congested_snapshot = None

def get_cached_asset(self, congestion: bool = False) -> pd.DataFrame:
"""
Retrieves the travel costs DataFrame from the cache.
Expand All @@ -92,7 +96,11 @@ def get_cached_asset(self, congestion: bool = False) -> pd.DataFrame:
if congestion is False:
path = self.cache_path["freeflow"]
else:
path = self.cache_path["congested"]
if self._current_congested_snapshot is not None:
return self._current_congested_snapshot.get()
# If no congestion snapshot has been applied in this run, treat
# "congested" as free-flow to avoid reusing stale shared caches.
path = self.cache_path["freeflow"]

logging.info("Travel costs already prepared. Reusing the file : " + str(path))
costs = pd.read_parquet(path)
Expand All @@ -117,7 +125,11 @@ def create_and_get_asset(self, congestion: bool = False) -> pd.DataFrame:
if congestion is False:
output_path = self.cache_path["freeflow"]
else:
output_path = self.cache_path["congested"]
if self._current_congested_snapshot is not None:
return self._current_congested_snapshot.get()
# Same rationale as get_cached_asset(): without an applied snapshot,
# compute free-flow costs.
output_path = self.cache_path["freeflow"]

costs = self.compute_costs_by_OD(self.transport_zones, self.contracted_path_graph, output_path)

Expand Down Expand Up @@ -163,10 +175,42 @@ def compute_costs_by_OD(
return costs


def update(self, od_flows):

self.contracted_path_graph.update(od_flows)
self.create_and_get_asset(congestion=True)
def update(self, od_flows, flow_asset=None):
"""Update congestion state.

Legacy behavior (flow_asset is None) mutates the shared congested graph/costs.
New behavior (flow_asset provided) builds isolated per-iteration snapshot assets
and switches `get(congestion=True)` to use that snapshot.
"""

if flow_asset is None:
self.contracted_path_graph.update(od_flows)
self._current_congested_snapshot = None
self.create_and_get_asset(congestion=True)
return

# Snapshot path: build a congested graph/costs variant keyed by flow_asset.
from mobility.transport_graphs.congested_path_graph_snapshot import CongestedPathGraphSnapshot
from mobility.transport_graphs.contracted_path_graph_snapshot import ContractedPathGraphSnapshot
from mobility.transport_costs.path_travel_costs_snapshot import PathTravelCostsSnapshot

congested_graph = CongestedPathGraphSnapshot(
modified_graph=self.modified_path_graph,
transport_zones=self.transport_zones,
vehicle_flows=flow_asset,
congestion_flows_scaling_factor=self.congested_path_graph.congestion_flows_scaling_factor,
)
contracted_graph = ContractedPathGraphSnapshot(congested_graph)

snapshot = PathTravelCostsSnapshot(
mode_name=self.mode_name,
transport_zones=self.transport_zones,
routing_parameters=self.routing_parameters,
contracted_graph=contracted_graph,
)

self._current_congested_snapshot = snapshot
snapshot.get()

def clone(self):

Expand Down
64 changes: 64 additions & 0 deletions mobility/transport_costs/path_travel_costs_snapshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os
import pathlib
import logging
import pandas as pd

from importlib import resources

from mobility.file_asset import FileAsset
from mobility.r_utils.r_script import RScript
from mobility.transport_zones import TransportZones
from mobility.path_routing_parameters import PathRoutingParameters
from mobility.transport_graphs.contracted_path_graph_snapshot import ContractedPathGraphSnapshot


class PathTravelCostsSnapshot(FileAsset):
"""A per-run/iteration travel-cost snapshot based on a contracted graph snapshot."""

def __init__(
self,
*,
mode_name: str,
transport_zones: TransportZones,
routing_parameters: PathRoutingParameters,
contracted_graph: ContractedPathGraphSnapshot,
):
inputs = {
"mode_name": str(mode_name),
"transport_zones": transport_zones,
"routing_parameters": routing_parameters,
"contracted_graph": contracted_graph,
"schema_version": 1,
}

folder_path = pathlib.Path(os.environ["MOBILITY_PROJECT_DATA_FOLDER"])
cache_path = folder_path / f"travel_costs_congested_{mode_name}.parquet"
super().__init__(inputs, cache_path)

def get_cached_asset(self) -> pd.DataFrame:
logging.info("Congested travel costs snapshot already prepared. Reusing: " + str(self.cache_path))
return pd.read_parquet(self.cache_path)

def create_and_get_asset(self) -> pd.DataFrame:
logging.info("Computing congested travel costs snapshot...")

transport_zones: TransportZones = self.inputs["transport_zones"]
contracted_graph: ContractedPathGraphSnapshot = self.inputs["contracted_graph"]
routing_parameters: PathRoutingParameters = self.inputs["routing_parameters"]

transport_zones.get()
contracted_graph.get()

script = RScript(resources.files('mobility.r_utils').joinpath('prepare_dodgr_costs.R'))
script.run(
args=[
str(transport_zones.cache_path),
str(contracted_graph.cache_path),
str(routing_parameters.filter_max_speed),
str(routing_parameters.filter_max_time),
str(self.cache_path),
]
)

return pd.read_parquet(self.cache_path)

17 changes: 12 additions & 5 deletions mobility/transport_graphs/congested_path_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ def get_cached_asset(self) -> pathlib.Path:

return self.cache_path

def create_and_get_asset(self, enable_congestion: bool = False) -> pathlib.Path:
def create_and_get_asset(self, enable_congestion: bool = False, flows_file_path: pathlib.Path | None = None) -> pathlib.Path:

logging.info("Loading graph with traffic...")
if flows_file_path is None:
flows_file_path = self.flows_file_path

self.load_graph(
self.modified_graph.get(),
self.transport_zones.cache_path,
enable_congestion,
self.flows_file_path,
flows_file_path,
self.congestion_flows_scaling_factor,
)

Expand Down Expand Up @@ -84,11 +86,16 @@ def load_graph(
return None


def update(self, od_flows):
def update(self, od_flows, flow_asset=None):

if self.handles_congestion is True:

od_flows.write_parquet(self.flows_file_path)
self.create_and_get_asset(enable_congestion=True)
if flow_asset is None:
od_flows.write_parquet(self.flows_file_path)
self.create_and_get_asset(enable_congestion=True)
else:
# flow_asset is expected to already be a parquet with the right schema.
flow_asset.get()
self.create_and_get_asset(enable_congestion=True, flows_file_path=flow_asset.cache_path)


Loading
Loading