diff --git a/.github/workflows/pr-test.yaml b/.github/workflows/pr-test.yaml index 4ebae9c..75c35da 100644 --- a/.github/workflows/pr-test.yaml +++ b/.github/workflows/pr-test.yaml @@ -6,7 +6,28 @@ on: workflow_dispatch: jobs: + regression-guards: + runs-on: ubuntu-latest + timeout-minutes: 10 + permissions: + contents: read + + steps: + - name: Check out repository + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: Run regression guard tests + run: | + set -euo pipefail + python -m unittest discover -s test -p 'test_regression_bugfixes.py' + docker-test: + needs: regression-guards runs-on: ubuntu-latest timeout-minutes: 60 permissions: diff --git a/docs/2026-02-10-bugfixes.md b/docs/2026-02-10-bugfixes.md new file mode 100644 index 0000000..a732426 --- /dev/null +++ b/docs/2026-02-10-bugfixes.md @@ -0,0 +1,46 @@ +# Bugfixes (2026-02-10) + +## Scope + +Applied fixes in `slide2vec/` only. +No edits were made in `slide2vec/hs2p` (submodule). + +## Changes + +- Fixed region-level model factory wiring so all supported region models initialize `tile_encoder` before wrapping with `RegionFeatureExtractor`: + - `slide2vec/models/models.py` +- Fixed subprocess process-group handling for CTRL+C termination: + - use `start_new_session=True` in child `Popen` + - keep `killpg` semantics safe + - `slide2vec/main.py` +- Improved CLI path robustness in orchestration: + - invoke embed/aggregate via modules (`-m slide2vec.embed`, `-m slide2vec.aggregate`) + - resolve hs2p path from package location + - `slide2vec/main.py` +- Fixed direct-script output path handling: + - avoid `Path(cfg.output_dir, None)` when `--output-dir` is not provided + - `slide2vec/embed.py` + - `slide2vec/aggregate.py` +- Reduced distributed deadlock risk in embedding: + - moved rank synchronization outside `try` block + - added rank-failure synchronization and propagation + - clean tmp feature shards on failure + - `slide2vec/embed.py` + +## Regression Tests + +Added deterministic source-level regression tests: + +- `test/test_regression_bugfixes.py` + +Covered checks: + +- child process session isolation for `killpg` +- safe output-dir composition +- no `barrier()` calls inside `try` blocks in per-slide embed loop +- correct region-model `tile_encoder` assignments + +## hs2p Suggestions (no edits applied) + +- Consider converting `hs2p` script invocation in orchestration to module entry points (mirroring embed/aggregate) to reduce path coupling. +- Consider adding a small smoke test for resumed runs to validate output directory nesting behavior (`resume` vs `skip-datetime` combinations). diff --git a/slide2vec/aggregate.py b/slide2vec/aggregate.py index 9e5f5f7..b980f8c 100644 --- a/slide2vec/aggregate.py +++ b/slide2vec/aggregate.py @@ -56,11 +56,20 @@ def scale_coordinates(wsi_fp, coordinates, spacing, backend): return scaled_coordinates +def resolve_output_dir(config_output_dir: str, cli_output_dir: str | None) -> Path: + if cli_output_dir is None: + return Path(config_output_dir) + cli_path = Path(cli_output_dir) + if cli_path.is_absolute(): + return cli_path + return Path(config_output_dir, cli_output_dir) + + def main(args): # setup configuration run_on_cpu = args.run_on_cpu cfg = get_cfg_from_file(args.config_file) - output_dir = Path(cfg.output_dir, args.output_dir) + output_dir = resolve_output_dir(cfg.output_dir, args.output_dir) cfg.output_dir = str(output_dir) coordinates_dir = Path(cfg.output_dir, "coordinates") diff --git a/slide2vec/embed.py b/slide2vec/embed.py index 50e498a..89b3acf 100644 --- a/slide2vec/embed.py +++ b/slide2vec/embed.py @@ -144,11 +144,27 @@ def load_sort_and_deduplicate_features(tmp_dir, name, expected_len=None): return features_unique +def resolve_output_dir(config_output_dir: str, cli_output_dir: str | None) -> Path: + if cli_output_dir is None: + return Path(config_output_dir) + cli_path = Path(cli_output_dir) + if cli_path.is_absolute(): + return cli_path + return Path(config_output_dir, cli_output_dir) + + +def cleanup_tmp_features(tmp_dir: Path, name: str): + for rank in range(distributed.get_global_size()): + fp = tmp_dir / f"{name}-rank_{rank}.h5" + if fp.exists(): + os.remove(fp) + + def main(args): # setup configuration run_on_cpu = args.run_on_cpu cfg = get_cfg_from_file(args.config_file) - output_dir = Path(cfg.output_dir, args.output_dir) + output_dir = resolve_output_dir(cfg.output_dir, args.output_dir) cfg.output_dir = str(output_dir) if not run_on_cpu: @@ -250,6 +266,14 @@ def main(args): disable=not distributed.is_main_process(), position=1, ): + name = wsi_fp.stem.replace(" ", "_") + feature_path = features_dir / f"{name}.pt" + if cfg.model.save_tile_embeddings: + feature_path = features_dir / f"{name}-tiles.pt" + tmp_feature_path = tmp_dir / f"{name}-rank_{distributed.get_global_rank()}.h5" + + status_info = {"status": "success"} + local_failed = False try: dataset = create_dataset( wsi_path=wsi_fp, @@ -280,12 +304,6 @@ def main(args): pin_memory=True, ) - name = wsi_fp.stem.replace(" ", "_") - feature_path = features_dir / f"{name}.pt" - if cfg.model.save_tile_embeddings: - feature_path = features_dir / f"{name}-tiles.pt" - tmp_feature_path = tmp_dir / f"{name}-rank_{distributed.get_global_rank()}.h5" - # get feature dimension and dtype using a dry run with torch.inference_mode(), autocast_context: sample_batch = next(iter(dataloader)) @@ -307,30 +325,75 @@ def main(args): run_on_cpu, ) - if not run_on_cpu: - torch.distributed.barrier() + except Exception as e: + local_failed = True + status_info = { + "status": "failed", + "error": str(e), + "traceback": str(traceback.format_exc()), + } + any_rank_failed = local_failed + if not run_on_cpu: + # Ensure every rank reaches sync points, even when one rank failed. + torch.distributed.barrier() + failure_flag = torch.tensor( + 1 if local_failed else 0, device=model.device, dtype=torch.int32 + ) + torch.distributed.all_reduce( + failure_flag, op=torch.distributed.ReduceOp.MAX + ) + any_rank_failed = bool(failure_flag.item()) + + if any_rank_failed: if distributed.is_main_process(): - wsi_feature = load_sort_and_deduplicate_features(tmp_dir, name, expected_len=len(dataset)) + cleanup_tmp_features(tmp_dir, name) + if status_info["status"] != "failed": + status_info = { + "status": "failed", + "error": "Feature extraction failed on at least one distributed rank.", + "traceback": "", + } + elif distributed.is_main_process(): + try: + wsi_feature = load_sort_and_deduplicate_features( + tmp_dir, name, expected_len=len(dataset) + ) torch.save(wsi_feature, feature_path) - - # cleanup - del wsi_feature + except Exception as e: + any_rank_failed = True + cleanup_tmp_features(tmp_dir, name) + status_info = { + "status": "failed", + "error": str(e), + "traceback": str(traceback.format_exc()), + } + finally: + if "wsi_feature" in locals(): + del wsi_feature if not run_on_cpu: torch.cuda.empty_cache() gc.collect() - if not run_on_cpu: - torch.distributed.barrier() - - feature_extraction_updates[str(wsi_fp)] = {"status": "success"} + if not run_on_cpu: + # Propagate post-processing failures from rank 0 to all ranks. + failure_flag = torch.tensor( + 1 if (distributed.is_main_process() and any_rank_failed) else 0, + device=model.device, + dtype=torch.int32, + ) + torch.distributed.broadcast(failure_flag, src=0) + torch.distributed.barrier() + any_rank_failed = bool(failure_flag.item()) - except Exception as e: - feature_extraction_updates[str(wsi_fp)] = { - "status": "failed", - "error": str(e), - "traceback": str(traceback.format_exc()), - } + if distributed.is_main_process(): + if any_rank_failed and status_info["status"] != "failed": + status_info = { + "status": "failed", + "error": "Feature extraction failed on at least one distributed rank.", + "traceback": "", + } + feature_extraction_updates[str(wsi_fp)] = status_info # update process_df if distributed.is_main_process(): diff --git a/slide2vec/main.py b/slide2vec/main.py index 8b94b14..d93c4ea 100644 --- a/slide2vec/main.py +++ b/slide2vec/main.py @@ -12,6 +12,8 @@ from slide2vec.utils.config import hf_login, setup +PACKAGE_ROOT = Path(__file__).resolve().parent + def get_args_parser(add_help: bool = True): parser = argparse.ArgumentParser("slide2vec", add_help=add_help) @@ -91,7 +93,8 @@ def run_feature_extraction(config_file, output_dir, run_on_cpu: False): "torch.distributed.run", f"--master_port={free_port}", "--nproc_per_node=gpu", - "slide2vec/embed.py", + "-m", + "slide2vec.embed", "--config-file", os.path.abspath(config_file), "--output-dir", @@ -100,7 +103,8 @@ def run_feature_extraction(config_file, output_dir, run_on_cpu: False): if run_on_cpu: cmd = [ sys.executable, - "slide2vec/embed.py", + "-m", + "slide2vec.embed", "--config-file", os.path.abspath(config_file), "--output-dir", @@ -108,7 +112,7 @@ def run_feature_extraction(config_file, output_dir, run_on_cpu: False): "--run-on-cpu", ] # launch in its own process group. - proc = subprocess.Popen(cmd) + proc = subprocess.Popen(cmd, start_new_session=True) try: proc.wait() except KeyboardInterrupt: @@ -126,7 +130,8 @@ def run_feature_aggregation(config_file, output_dir, run_on_cpu: False): # find a free port cmd = [ sys.executable, - "slide2vec/aggregate.py", + "-m", + "slide2vec.aggregate", "--config-file", os.path.abspath(config_file), "--output-dir", @@ -135,7 +140,7 @@ def run_feature_aggregation(config_file, output_dir, run_on_cpu: False): if run_on_cpu: cmd.append("--run-on-cpu") # launch in its own process group. - proc = subprocess.Popen(cmd) + proc = subprocess.Popen(cmd, start_new_session=True) try: proc.wait() except KeyboardInterrupt: @@ -156,7 +161,7 @@ def main(args): hf_login() - root_dir = "slide2vec/hs2p" + root_dir = PACKAGE_ROOT / "hs2p" if cfg.resume: # need to remove the dirname to avoid nested output directories hs2p_output_dir = output_dir.parent diff --git a/slide2vec/models/models.py b/slide2vec/models/models.py index b54f413..0f835c8 100644 --- a/slide2vec/models/models.py +++ b/slide2vec/models/models.py @@ -92,17 +92,17 @@ def __init__( elif options.name == "h-optimus-1": tile_encoder = Hoptimus1() elif options.name == "conch": - model = CONCH() + tile_encoder = CONCH() elif options.name == "musk": - model = MUSK() + tile_encoder = MUSK() elif options.name == "phikonv2": - model = PhikonV2() + tile_encoder = PhikonV2() elif options.name == "hibou": - model = Hibou() + tile_encoder = Hibou() elif options.name == "kaiko": - model = Kaiko(arch=options.arch) + tile_encoder = Kaiko(arch=options.arch) elif options.name == "kaiko-midnight": - model = Midnight12k() + tile_encoder = Midnight12k() elif options.name == "rumc-vit-s-50k": tile_encoder = CustomViT( arch="vit_small", diff --git a/test/test_regression_bugfixes.py b/test/test_regression_bugfixes.py new file mode 100644 index 0000000..0dbe4e1 --- /dev/null +++ b/test/test_regression_bugfixes.py @@ -0,0 +1,113 @@ +import ast +import re +import unittest +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] + + +def read_source(rel_path: str) -> str: + return (ROOT / rel_path).read_text(encoding="utf-8") + + +def parse_source(rel_path: str) -> ast.AST: + return ast.parse(read_source(rel_path)) + + +class RegressionBugfixTests(unittest.TestCase): + def test_main_uses_dedicated_process_groups_for_children(self): + tree = parse_source("slide2vec/main.py") + functions = { + node.name: node + for node in tree.body + if isinstance(node, ast.FunctionDef) + } + for fn_name in ("run_feature_extraction", "run_feature_aggregation"): + fn = functions[fn_name] + popen_calls = [ + call + for call in ast.walk(fn) + if isinstance(call, ast.Call) + and isinstance(call.func, ast.Attribute) + and isinstance(call.func.value, ast.Name) + and call.func.value.id == "subprocess" + and call.func.attr == "Popen" + ] + self.assertTrue(popen_calls, f"No subprocess.Popen call found in {fn_name}") + for call in popen_calls: + kws = {kw.arg: kw.value for kw in call.keywords} + self.assertIn( + "start_new_session", + kws, + f"{fn_name} must set start_new_session=True for safe killpg", + ) + self.assertIsInstance(kws["start_new_session"], ast.Constant) + self.assertTrue(kws["start_new_session"].value) + + def test_embed_and_aggregate_do_not_join_path_with_none(self): + for rel_path in ("slide2vec/embed.py", "slide2vec/aggregate.py"): + tree = parse_source(rel_path) + bad_calls = [] + for node in ast.walk(tree): + if ( + isinstance(node, ast.Call) + and isinstance(node.func, ast.Name) + and node.func.id == "Path" + and len(node.args) >= 2 + ): + second = node.args[1] + if ( + isinstance(second, ast.Attribute) + and isinstance(second.value, ast.Name) + and second.value.id == "args" + and second.attr == "output_dir" + ): + bad_calls.append(node) + self.assertFalse( + bad_calls, + f"{rel_path} should not call Path(cfg.output_dir, args.output_dir) directly", + ) + + def test_embed_has_no_barrier_calls_inside_try_block(self): + tree = parse_source("slide2vec/embed.py") + try_nodes = [node for node in ast.walk(tree) if isinstance(node, ast.Try)] + self.assertTrue(try_nodes, "Expected at least one try/except block") + + barrier_calls_inside_try = [] + for try_node in try_nodes: + for stmt in try_node.body: + for node in ast.walk(stmt): + if ( + isinstance(node, ast.Call) + and isinstance(node.func, ast.Attribute) + and node.func.attr == "barrier" + ): + barrier_calls_inside_try.append(node) + + self.assertFalse( + barrier_calls_inside_try, + "torch.distributed.barrier calls inside try blocks can deadlock when ranks diverge", + ) + + def test_region_model_factory_uses_tile_encoder_assignments(self): + src = read_source("slide2vec/models/models.py") + expected = { + "conch": "tile_encoder = CONCH()", + "musk": "tile_encoder = MUSK()", + "phikonv2": "tile_encoder = PhikonV2()", + "hibou": "tile_encoder = Hibou()", + "kaiko": "tile_encoder = Kaiko(arch=options.arch)", + "kaiko-midnight": "tile_encoder = Midnight12k()", + } + for model_name, assignment in expected.items(): + pattern = rf'elif options.name == "{re.escape(model_name)}":\n\s+{re.escape(assignment)}' + self.assertRegex( + src, + pattern, + f"Region-level branch for {model_name} should assign to tile_encoder", + ) + + +if __name__ == "__main__": + unittest.main()