-
Notifications
You must be signed in to change notification settings - Fork 0
chore(workspace): propagate unified make and release automation #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| 3.13 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,12 +12,19 @@ | |
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import os | ||
| import re | ||
| import subprocess | ||
| import sys | ||
| import tomllib | ||
| from pathlib import Path | ||
| from typing import Any | ||
|
|
||
| if str(Path(__file__).resolve().parents[2]) not in sys.path: | ||
| sys.path.insert(0, str(Path(__file__).resolve().parents[2])) | ||
|
|
||
| from libs.selection import resolve_projects | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P0: Module Prompt for AI agents |
||
|
|
||
| # Mypy output patterns for typing library detection (aligned with stub_supply_chain) | ||
| MYPY_HINT_RE = re.compile(r'note: Hint: "python3 -m pip install ([^"]+)"') | ||
| MYPY_STUB_RE = re.compile(r'Library stubs not installed for "([^"]+)"') | ||
|
|
@@ -62,22 +69,16 @@ def discover_projects( | |
| workspace_root: Path, | ||
| projects_filter: list[str] | None = None, | ||
| ) -> list[Path]: | ||
| """Discover all Python projects under workspace (top-level dirs with pyproject.toml). | ||
|
|
||
| Matches root Makefile / sync_dependencies: any dir with pyproject.toml, excluding SKIP_DIRS. | ||
| """ | ||
| projects: list[Path] = [] | ||
| for item in sorted(workspace_root.iterdir()): | ||
| if not item.is_dir(): | ||
| continue | ||
| if any(skip in item.name for skip in SKIP_DIRS): | ||
| continue | ||
| if not (item / "pyproject.toml").exists(): | ||
| continue | ||
| if projects_filter is not None and item.name not in projects_filter: | ||
| continue | ||
| projects.append(item) | ||
| return projects | ||
| projects = [ | ||
| project.path | ||
| for project in resolve_projects(workspace_root, names=[]) | ||
| if (project.path / "pyproject.toml").exists() | ||
| and not any(skip in project.name for skip in SKIP_DIRS) | ||
| ] | ||
| if projects_filter is not None: | ||
| filter_set = set(projects_filter) | ||
| projects = [path for path in projects if path.name in filter_set] | ||
| return sorted(projects) | ||
|
|
||
|
|
||
| def run_deptry( | ||
|
|
@@ -146,7 +147,7 @@ def run_pip_check(workspace_root: Path, venv_bin: Path) -> tuple[list[str], int] | |
| capture_output=True, | ||
| text=True, | ||
| timeout=60, | ||
| env={**subprocess.os.environ, "VIRTUAL_ENV": str(venv_bin.parent)}, | ||
| env={**os.environ, "VIRTUAL_ENV": str(venv_bin.parent)}, | ||
| ) | ||
| out = (result.stdout or "").strip().splitlines() if result.stdout else [] | ||
| return out, result.returncode | ||
|
|
@@ -219,9 +220,9 @@ def run_mypy_stub_hints( | |
| "--no-error-summary", | ||
| ] | ||
| env = { | ||
| **subprocess.os.environ, | ||
| **os.environ, | ||
| "VIRTUAL_ENV": str(venv_bin.parent), | ||
| "PATH": f"{venv_bin}:{subprocess.os.environ.get('PATH', '')}", | ||
| "PATH": f"{venv_bin}:{os.environ.get('PATH', '')}", | ||
| } | ||
| result = subprocess.run( | ||
| cmd, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,204 @@ | ||
| #!/usr/bin/env python3 | ||
| from __future__ import annotations | ||
|
|
||
| import argparse | ||
| import json | ||
| import subprocess | ||
| from pathlib import Path | ||
|
|
||
|
|
||
| def _run_capture(command: list[str], cwd: Path) -> str: | ||
| result = subprocess.run( | ||
| command, | ||
| cwd=cwd, | ||
| capture_output=True, | ||
| text=True, | ||
| check=False, | ||
| ) | ||
| if result.returncode != 0: | ||
| detail = (result.stderr or result.stdout).strip() | ||
| raise RuntimeError( | ||
| f"command failed ({result.returncode}): {' '.join(command)}: {detail}" | ||
| ) | ||
| return result.stdout.strip() | ||
|
|
||
|
|
||
| def _run_stream(command: list[str], cwd: Path) -> int: | ||
| result = subprocess.run(command, cwd=cwd, check=False) | ||
| return result.returncode | ||
|
|
||
|
|
||
| def _current_branch(repo_root: Path) -> str: | ||
| return _run_capture(["git", "rev-parse", "--abbrev-ref", "HEAD"], repo_root) | ||
|
|
||
|
|
||
| def _open_pr_for_head(repo_root: Path, head: str) -> dict[str, object] | None: | ||
| raw = _run_capture( | ||
| [ | ||
| "gh", | ||
| "pr", | ||
| "list", | ||
| "--state", | ||
| "open", | ||
| "--head", | ||
| head, | ||
| "--json", | ||
| "number,title,state,baseRefName,headRefName,url,isDraft", | ||
| "--limit", | ||
| "1", | ||
| ], | ||
| repo_root, | ||
| ) | ||
| payload = json.loads(raw) | ||
| if not payload: | ||
| return None | ||
| first = payload[0] | ||
| if not isinstance(first, dict): | ||
| return None | ||
| return first | ||
|
|
||
|
|
||
| def _print_status(repo_root: Path, base: str, head: str) -> int: | ||
| pr = _open_pr_for_head(repo_root, head) | ||
| print(f"repo={repo_root}") | ||
| print(f"base={base}") | ||
| print(f"head={head}") | ||
| if pr is None: | ||
| print("status=no-open-pr") | ||
| return 0 | ||
| print("status=open") | ||
| print(f"pr_number={pr.get('number')}") | ||
| print(f"pr_title={pr.get('title')}") | ||
| print(f"pr_url={pr.get('url')}") | ||
| print(f"pr_state={pr.get('state')}") | ||
| print(f"pr_draft={pr.get('isDraft')}") | ||
| return 0 | ||
|
|
||
|
|
||
| def _selector(pr_number: str, head: str) -> str: | ||
| return pr_number if pr_number else head | ||
|
|
||
|
|
||
| def _create_pr( | ||
| repo_root: Path, | ||
| base: str, | ||
| head: str, | ||
| title: str, | ||
| body: str, | ||
| draft: int, | ||
| ) -> int: | ||
| existing = _open_pr_for_head(repo_root, head) | ||
| if existing is not None: | ||
| print(f"status=already-open") | ||
| print(f"pr_url={existing.get('url')}") | ||
| return 0 | ||
|
|
||
| command = [ | ||
| "gh", | ||
| "pr", | ||
| "create", | ||
| "--base", | ||
| base, | ||
| "--head", | ||
| head, | ||
| "--title", | ||
| title, | ||
| "--body", | ||
| body, | ||
| ] | ||
| if draft == 1: | ||
| command.append("--draft") | ||
|
|
||
| created = _run_capture(command, repo_root) | ||
| print("status=created") | ||
| print(f"pr_url={created}") | ||
| return 0 | ||
|
|
||
|
|
||
| def _merge_pr( | ||
| repo_root: Path, | ||
| selector: str, | ||
| method: str, | ||
| auto: int, | ||
| delete_branch: int, | ||
| ) -> int: | ||
| command = ["gh", "pr", "merge", selector] | ||
| merge_flag = { | ||
| "merge": "--merge", | ||
| "rebase": "--rebase", | ||
| "squash": "--squash", | ||
| }.get(method, "--squash") | ||
| command.append(merge_flag) | ||
| if auto == 1: | ||
| command.append("--auto") | ||
| if delete_branch == 1: | ||
| command.append("--delete-branch") | ||
| exit_code = _run_stream(command, repo_root) | ||
| if exit_code == 0: | ||
| print("status=merged") | ||
| return exit_code | ||
|
|
||
|
|
||
| def _parse_args() -> argparse.Namespace: | ||
| parser = argparse.ArgumentParser() | ||
| _ = parser.add_argument("--repo-root", type=Path, default=Path(".")) | ||
| _ = parser.add_argument( | ||
| "--action", | ||
| default="status", | ||
| choices=["status", "create", "view", "checks", "merge", "close"], | ||
| ) | ||
| _ = parser.add_argument("--base", default="main") | ||
| _ = parser.add_argument("--head", default="") | ||
| _ = parser.add_argument("--number", default="") | ||
| _ = parser.add_argument("--title", default="") | ||
| _ = parser.add_argument("--body", default="") | ||
| _ = parser.add_argument("--draft", type=int, default=0) | ||
| _ = parser.add_argument("--merge-method", default="squash") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P2: The Prompt for AI agents |
||
| _ = parser.add_argument("--auto", type=int, default=0) | ||
| _ = parser.add_argument("--delete-branch", type=int, default=0) | ||
| _ = parser.add_argument("--checks-strict", type=int, default=0) | ||
| return parser.parse_args() | ||
|
|
||
|
|
||
| def main() -> int: | ||
| args = _parse_args() | ||
| repo_root = args.repo_root.resolve() | ||
| head = args.head or _current_branch(repo_root) | ||
| base = args.base | ||
| selector = _selector(args.number, head) | ||
|
|
||
| if args.action == "status": | ||
| return _print_status(repo_root, base, head) | ||
|
|
||
| if args.action == "create": | ||
| title = args.title or f"chore: sync {head}" | ||
| body = args.body or "Automated PR managed by scripts/github/pr_manager.py" | ||
| return _create_pr(repo_root, base, head, title, body, args.draft) | ||
|
|
||
| if args.action == "view": | ||
| return _run_stream(["gh", "pr", "view", selector], repo_root) | ||
|
|
||
| if args.action == "checks": | ||
| exit_code = _run_stream(["gh", "pr", "checks", selector], repo_root) | ||
| if exit_code != 0 and args.checks_strict == 0: | ||
| print("status=checks-nonblocking") | ||
| return 0 | ||
| return exit_code | ||
|
|
||
| if args.action == "merge": | ||
| return _merge_pr( | ||
| repo_root, | ||
| selector, | ||
| args.merge_method, | ||
| args.auto, | ||
| args.delete_branch, | ||
| ) | ||
|
|
||
| if args.action == "close": | ||
| return _run_stream(["gh", "pr", "close", selector], repo_root) | ||
|
|
||
| raise RuntimeError(f"unknown action: {args.action}") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| raise SystemExit(main()) | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P0: Import from non-existent module
libs.discovery— the module does not exist anywhere in the repository. This will cause aModuleNotFoundErrorat import time, making the entire script unusable. The old inlinediscover_projectsimplementation was removed, so there is no fallback. Ensurelibs/discovery.py(with adiscover_projectsfunction returning objects with.nameand.kindattributes) is added to the repository, or revert to the previous inline implementation.Prompt for AI agents