From d20d70b58b5ba427891ef92b3f597b8720fd5f1e Mon Sep 17 00:00:00 2001 From: zhangt Date: Tue, 16 Sep 2025 08:17:38 -0700 Subject: [PATCH 01/15] Init commit for docker env and simulator API --- .gitignore | 1 + Dockerfile | 17 ++-- backend_app/README.md | 4 + backend_app/main.py | 158 ++++++++++++++++++++++++++++++++ backend_app/simulator.py | 163 ++++++++++++++++++++++++++++++++++ environment.yml | 8 +- tests/test_api_integration.py | 148 ++++++++++++++++++++++++++++++ 7 files changed, 488 insertions(+), 11 deletions(-) create mode 100644 backend_app/README.md create mode 100644 backend_app/main.py create mode 100644 backend_app/simulator.py create mode 100644 tests/test_api_integration.py diff --git a/.gitignore b/.gitignore index 84c949d..2f2fa79 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ SlurmOutput/ temp/ .vscode/ *.ncu-rep +artifacts \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 8a37b06..3c38f75 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,18 +5,21 @@ FROM continuumio/miniconda3 WORKDIR /app # Create the conda environment -COPY environment.yml /app/environment.yml +COPY . /app RUN conda env create -f /app/environment.yml -# Initialize conda in bash shell -RUN echo "source activate llmcompass_ae" > ~/.bashrc +# Do not rely on `source activate` in non-interactive shells; set PATH to the env's bin ENV PATH /opt/conda/envs/llmcompass_ae/bin:$PATH -# Clone your GitHub repository -RUN git clone https://github.com/HenryChang213/LLMCompass_ISCA_AE.git /app/LLMCompass_ISCA_AE -RUN cd /app/LLMCompass_ISCA_AE && git submodule init && git submodule update --recursive +# Install lightweight Python deps for the API server inside the conda env +RUN /opt/conda/envs/llmcompass_ae/bin/pip install \ + fastapi \ + "uvicorn[standard]" \ + aiosqlite \ + requests -# Expose the port your app runs on +# Expose the port your app runs on and run uvicorn as entrypoint EXPOSE 8000 +# NOTE: CMD removed to allow interactive testing. Re-add the uvicorn CMD after verification. diff --git a/backend_app/README.md b/backend_app/README.md new file mode 100644 index 0000000..2131874 --- /dev/null +++ b/backend_app/README.md @@ -0,0 +1,4 @@ + +sudo docker build -t llmcompass-backend . + +sudo docker run --rm -it -w /app --name llmcompass llmcompass-backend /bin/bash \ No newline at end of file diff --git a/backend_app/main.py b/backend_app/main.py new file mode 100644 index 0000000..450cff7 --- /dev/null +++ b/backend_app/main.py @@ -0,0 +1,158 @@ +import uuid +import asyncio +import json +import datetime +import os +from typing import Any, List, Union, Optional +from contextlib import asynccontextmanager +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +from backend_app.simulator import simulate_kernel_trace, process_kernel_simulation_task + + +@asynccontextmanager +async def lifespan(app: FastAPI): + # in-memory tasks store (not persisted): lost on restart + app.state.tasks = {} + app.state.tasks_lock = asyncio.Lock() + + # create queue and start background worker + app.state.queue = asyncio.Queue() + app.state.worker_task = asyncio.create_task(worker_loop(app.state.queue, app.state.tasks, app.state.tasks_lock)) + + try: + yield + finally: + # shutdown: cancel background worker + worker = getattr(app.state, "worker_task", None) + if worker: + worker.cancel() + try: + await worker + except asyncio.CancelledError: + pass + + +app = FastAPI(title="LLMCompass Kernel Simulator", lifespan=lifespan) + +class KernelTask(BaseModel): + kernel_name: str + op: str + # allow either a flat shape like [M,K,N] or a pair of shapes for A and B + input_dim: Optional[Union[List[int], List[List[int]]]] = None + # some clients send a single dtype string, others send a list of dtype strings + dtype: Optional[Union[str, List[str]]] = "fp32" + # optional system key + system_key: Optional[str] = None + + +async def worker_loop(queue: asyncio.Queue, tasks: dict, lock: asyncio.Lock): + while True: + task_id = await queue.get() + try: + async with lock: + entry = tasks.get(task_id) + if not entry: + queue.task_done() + continue + payload = entry["payload"] + # process (outside lock) + result = await process_kernel_simulation_task(payload) + async with lock: + if task_id in tasks: + tasks[task_id]["status"] = "done" + tasks[task_id]["result"] = result + tasks[task_id]["updated_at"] = datetime.datetime.utcnow().isoformat() + except Exception as e: + async with lock: + if task_id in tasks: + tasks[task_id]["status"] = "failed" + tasks[task_id]["result"] = {"error": str(e)} + tasks[task_id]["updated_at"] = datetime.datetime.utcnow().isoformat() + finally: + queue.task_done() + + +@app.post("/tasks") +async def create_task(t: KernelTask, wait: bool = False, timeout: float = 30.0): + """ + Create a kernel simulation task. + If `wait` is False (default) the task is queued and returns immediately with status queued. + If `wait` is True the request will block up to `timeout` seconds and return the final status/result. + """ + task_id = str(uuid.uuid4()) + payload = t.dict() + created_at = datetime.datetime.utcnow().isoformat() + + # insert into in-memory store + async with app.state.tasks_lock: + app.state.tasks[task_id] = { + "payload": payload, + "status": "queued", + "result": None, + "created_at": created_at, + "updated_at": created_at, + } + + if not wait: + # enqueue for background processing + await app.state.queue.put(task_id) + return {"task_id": task_id, "status": "queued"} + + # synchronous path: process inline with timeout + try: + result = await asyncio.wait_for(process_kernel_simulation_task(payload), timeout=timeout) + except asyncio.TimeoutError: + # leave as queued for background worker to pick up later + return {"task_id": task_id, "status": "timeout", "message": f"processing did not finish within {timeout}s"} + except Exception as e: + # update in-memory store as failed + async with app.state.tasks_lock: + if task_id in app.state.tasks: + app.state.tasks[task_id]["status"] = "failed" + app.state.tasks[task_id]["result"] = {"error": str(e)} + app.state.tasks[task_id]["updated_at"] = datetime.datetime.utcnow().isoformat() + raise HTTPException(status_code=500, detail=str(e)) + + # write result into in-memory store and return + async with app.state.tasks_lock: + if task_id in app.state.tasks: + app.state.tasks[task_id]["status"] = "done" + app.state.tasks[task_id]["result"] = result + app.state.tasks[task_id]["updated_at"] = datetime.datetime.utcnow().isoformat() + + return {"task_id": task_id, "status": "done", "result": result} + + +@app.get("/supported_ops") +async def supported_ops(): + from backend_app.simulator import get_supported_ops + + return {"supported_ops": get_supported_ops()} + + +@app.get("/tasks/{task_id}") +async def get_task(task_id: str): + async with app.state.tasks_lock: + entry = app.state.tasks.get(task_id) + if not entry: + raise HTTPException(status_code=404, detail="task not found") + status = entry.get("status") + result = entry.get("result") + payload = entry.get("payload") + created_at = entry.get("created_at") + updated_at = entry.get("updated_at") + + return { + "task_id": task_id, + "status": status, + "result": result, + "payload": payload, + "created_at": created_at, + "updated_at": updated_at, + } + + +@app.get("/health") +async def health(): + return {"status": "ok"} diff --git a/backend_app/simulator.py b/backend_app/simulator.py new file mode 100644 index 0000000..84a5cfe --- /dev/null +++ b/backend_app/simulator.py @@ -0,0 +1,163 @@ +import asyncio +import time +import re +from typing import Any, Dict + +# import real software models and hardware/system descriptions +from software_model.utils import Tensor, data_type_dict +from software_model.matmul import Matmul, BatchedMatmul +from software_model.softmax import Softmax +from software_model.layernorm import LayerNorm +from software_model.gelu import GeLU +from software_model.operators import Operator +from hardware_model.system import system_dict + + +def _map_dtype(dtype_str: str): + s = dtype_str.lower() + if "fp16" in s or "float16" in s: + return data_type_dict.get("fp16") + else: + return None + +def _make_tensor(shape, dtype_obj): + try: + return Tensor(list(shape), dtype_obj) + except Exception: + return None + + +def _simulate_matmul_sync(kernel_name: str, input_dim: list[list], dtype_str: list[str], system_key: str = None) -> Dict[str, Any]: + # normalize dtype input + dt = _map_dtype(dtype_str[0]) + A_shape = input_dim[0] + B_shape = input_dim[1] + + if dt is None or A_shape is None or B_shape is None: + return { + "status": "failed", + "output": None, + "time_taken": None, + "metadata": { + "kernel_name": kernel_name, + "error": "invalid dtype or input_dim - cannot simulate matmul", + "error_code": "INVALID_INPUT", + "hint": "Check dtype and input_dim in the request", + }, + } + + op = Matmul(dt) + A = _make_tensor(A_shape, dt) + B = _make_tensor(B_shape, dt) + _ = op(A, B) + + # require valid system_key and resolve device safely + if not system_key: + return { + "status": "failed", + "output": None, + "time_taken": None, + "metadata": { + "kernel_name": kernel_name, + "error": "no system_key provided - cannot simulate matmul", + "error_code": "NO_SYSTEM", + "hint": "Provide a valid system_key in the request", + }, + } + system = system_dict.get(system_key) + if system is None: + return { + "status": "failed", + "output": None, + "time_taken": None, + "metadata": { + "kernel_name": kernel_name, + "error": f"missing system configuration '{system_key}' - cannot simulate matmul", + "error_code": "NO_SYSTEM", + "hint": "Use a key from hardware_model.system.system_dict", + }, + } + + device = system.device + latency = op.compile_and_simulate(device, compile_mode="heuristic-GPU") + return { + "status": "success", + "output": {"summary": "matmul simulated"}, + "time_taken": float(latency), + "metadata": {"kernel_name": kernel_name, "input_dim": input_dim, "dtype": dtype_str}, + } + +def _simulate_fail(kernel_name: str, _input_dim=None, _dtype_str: str = "") -> Dict[str, Any]: + # generic fallback is not allowed per policy; return explicit failure + return { + "status": "failed", + "output": None, + "time_taken": None, + "metadata": { + "kernel_name": kernel_name, + "error": "unsupported op - no generic simulator available", + "error_code": "UNSUPPORTED_OP", + "hint": "Implement a simulator for this kernel in software_model or submit with a supported op" + }, + } + + +def _select_sync_simulator(kernel_name: str): + if not kernel_name: + return _simulate_fail + kn = kernel_name.lower() + if "matmul" in kn: + return _simulate_matmul_sync + # conv and other ops are not supported unless explicitly implemented + return _simulate_fail + + +async def simulate_kernel_trace(kernel_name: str, op: str, input_dim: list[list], dtype: list[str], system_key: str) -> Dict[str, Any]: + """ + Dispatch to real software_model simulations. Blocking compile_and_simulate calls are executed + in a thread via asyncio.to_thread so the event loop is not blocked. + Returns standardized result: {status, output, time_taken, metadata} + """ + # prefer op if provided (more specific), else fall back to kernel_name + selector = op if op else kernel_name + simulator = _select_sync_simulator(selector) + # run simulator in thread; pass system_key when simulator expects it + result = await asyncio.to_thread(simulator, kernel_name, input_dim, dtype, system_key) + + # simulator returns dict; validate and propagate structured failure + if not isinstance(result, dict): + return {"status": "failed", "output": None, "time_taken": None, "metadata": {"kernel_name": kernel_name, "error": "simulator returned invalid result type", "error_code": "SIMULATOR_ERROR", "hint": "Check simulator implementation"}} + # ensure failure results have error_code/hint when possible + if result.get("status") == "failed": + md = result.setdefault("metadata", {}) + md.setdefault("error_code", md.get("error_code", "SIMULATOR_FAILED")) + md.setdefault("hint", md.get("hint", "See simulator logs for details")) + return result + + +def get_supported_ops() -> list: + """Return list of supported op keywords for routing/diagnostics.""" + return ["gelu", "layernorm", "matmul", "softmax"] + + +async def process_kernel_simulation_task(kernel_task: Dict[str, Any]) -> Dict[str, Any]: + """ + Public entrypoint used by the worker. Calls simulate_kernel_trace and normalizes output. + """ + kernel_name = kernel_task.get("kernel_name", "") + op = kernel_task.get("op", "") + input_dim = kernel_task.get("input_dim", []) + dtype = kernel_task.get("dtype", []) + system_key = kernel_task.get("system_key") + start = time.time() + res = await simulate_kernel_trace(kernel_name, op, input_dim, dtype, system_key=system_key) + end = time.time() + # normalize to expected schema + out = { + "kernel_name": kernel_name, + "status": res.get("status", "failed"), + "output": res.get("output"), + "time_taken": res.get("time_taken", end - start), + "metadata": res.get("metadata", {"op": op, "input_dim": input_dim, "dtype": dtype}), + } + return out diff --git a/environment.yml b/environment.yml index b168460..e7bfae2 100644 --- a/environment.yml +++ b/environment.yml @@ -1,12 +1,12 @@ name: llmcompass_ae channels: - - pytorch - defaults dependencies: - python=3.9 - - pytorch - pip: - - scalesim + - torch==2.5.1 + - scalesim==2.0.2 - matplotlib - seaborn - - scipy \ No newline at end of file + - scipy + - pytest \ No newline at end of file diff --git a/tests/test_api_integration.py b/tests/test_api_integration.py new file mode 100644 index 0000000..8a584cf --- /dev/null +++ b/tests/test_api_integration.py @@ -0,0 +1,148 @@ +import os +import time +import subprocess +import sys +import signal +import json +from pathlib import Path + +import pytest +import requests + + +# Server config: if API_URL is set we will target that and not start a server. +SERVER_HOST = "127.0.0.1" +SERVER_PORT = int(os.environ.get("API_PORT", "8000")) +BASE = os.environ.get("API_URL", f"http://{SERVER_HOST}:{SERVER_PORT}") + +# artifacts directory for intermediate results +ARTIFACT_DIR = Path(os.environ.get("ARTIFACT_DIR", "artifacts")) + + +def _ensure_artifacts_dir(): + ARTIFACT_DIR.mkdir(parents=True, exist_ok=True) + + +@pytest.fixture(scope="session", autouse=True) +def server(): + """Start a uvicorn server for the test session when API_URL is not set. + + If API_URL is provided (pointing to an external service), the fixture does nothing. + """ + _ensure_artifacts_dir() + if os.environ.get("API_URL"): + # External server provided, do not start local uvicorn. + yield + return + + cmd = [sys.executable, "-m", "uvicorn", "backend_app.main:app", "--host", SERVER_HOST, "--port", str(SERVER_PORT)] + + # redirect uvicorn output to artifact files + out_path = ARTIFACT_DIR / "uvicorn.out" + err_path = ARTIFACT_DIR / "uvicorn.err" + fout = open(out_path, "wb") + ferr = open(err_path, "wb") + proc = subprocess.Popen(cmd, stdout=fout, stderr=ferr, preexec_fn=None) + + # wait for health endpoint + deadline = time.time() + 10 + while time.time() < deadline: + try: + r = requests.get(f"http://{SERVER_HOST}:{SERVER_PORT}/health", timeout=1) + if r.status_code == 200: + break + except Exception: + pass + time.sleep(0.1) + else: + # failed to start in time; capture stderr for debugging + try: + ferr.flush() + with open(err_path, "rb") as f: + err = f.read() + except Exception: + proc.kill() + err = b"(no stderr available)" + fout.close() + ferr.close() + raise RuntimeError(f"uvicorn failed to start in time. stderr:\n{err.decode(errors='ignore')}") + + try: + yield + finally: + # terminate the server process + proc.terminate() + try: + proc.wait(timeout=5) + except Exception: + proc.kill() + proc.wait() + fout.close() + ferr.close() + + +def _url(path: str) -> str: + return BASE.rstrip("/") + path + + +@pytest.mark.integration +def test_health(): + r = requests.get(_url("/health"), timeout=5) + with open(ARTIFACT_DIR / "health.json", "w") as f: + json.dump({"status_code": r.status_code, "body": r.json()}, f, indent=2) + assert r.status_code == 200 + assert r.json().get("status") == "ok" + + +@pytest.mark.integration +def test_supported_ops(): + r = requests.get(_url("/supported_ops"), timeout=5) + with open(ARTIFACT_DIR / "supported_ops.json", "w") as f: + json.dump({"status_code": r.status_code, "body": r.json()}, f, indent=2) + assert r.status_code == 200 + j = r.json() + assert isinstance(j.get("supported_ops"), list) + +@pytest.mark.integration +def test_create_task_and_poll_matmul(): + """Submit a matmul task and poll for completion; save artifacts for debugging.""" + + payload = {"kernel_name": "itest_matmul", + "op": "matmul", + "input_dim": [[1, 2048], [2048, 7168]], + "dtype": ['c10::BFloat16', 'c10::BFloat16'], + "system_key": "A100_4_fp16"} + + with open(ARTIFACT_DIR / "matmul_create_task_request.json", "w") as f: + json.dump({"url": _url("/tasks"), "payload": payload, "op": "matmul"}, f, indent=2) + + r = requests.post(_url("/tasks"), json=payload, timeout=5) + with open(ARTIFACT_DIR / "matmul_create_task_response.json", "w") as f: + try: + body = r.json() + except Exception: + body = {"text": r.text} + json.dump({"status_code": r.status_code, "body": body}, f, indent=2) + + assert r.status_code == 200 + j = r.json() + task_id = j.get("task_id") + assert task_id + + # poll briefly for terminal status + deadline = time.time() + 20 + last = None + while time.time() < deadline: + r = requests.get(_url(f"/tasks/{task_id}"), timeout=5) + if r.status_code == 200: + info = r.json() + status = info.get("status") + with open(ARTIFACT_DIR / f"task_{task_id}_poll_matmul.json", "w") as f: + json.dump({"status_code": r.status_code, "body": info}, f, indent=2) + if status == "done": + assert "result" in info + break + last = status + time.sleep(1) + + assert last in ("done", "failed", "queued", None) From c1fb334c7a817cbebe83d330e21b696e57df2733 Mon Sep 17 00:00:00 2001 From: zhangt Date: Tue, 16 Sep 2025 19:52:24 -0700 Subject: [PATCH 02/15] Cover all ops in scheduler, simulator and tests --- Dockerfile | 4 +- backend_app/README.md | 154 ++++++++++++++++- backend_app/main.py | 35 +++- backend_app/scheduler.py | 62 +++++++ backend_app/sim_utils.py | 44 +++++ backend_app/simulator.py | 163 ----------------- backend_app/sync_simulators.py | 243 ++++++++++++++++++++++++++ tests/test_api_integration.py | 307 +++++++++++++++++++++++++++++++-- 8 files changed, 824 insertions(+), 188 deletions(-) create mode 100644 backend_app/scheduler.py create mode 100644 backend_app/sim_utils.py delete mode 100644 backend_app/simulator.py create mode 100644 backend_app/sync_simulators.py diff --git a/Dockerfile b/Dockerfile index 3c38f75..92f5532 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,6 +20,6 @@ RUN /opt/conda/envs/llmcompass_ae/bin/pip install \ # Expose the port your app runs on and run uvicorn as entrypoint EXPOSE 8000 -# NOTE: CMD removed to allow interactive testing. Re-add the uvicorn CMD after verification. - +# Start the FastAPI server using Uvicorn when the container launches +CMD ["uvicorn", "backend_app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/backend_app/README.md b/backend_app/README.md index 2131874..52ed3db 100644 --- a/backend_app/README.md +++ b/backend_app/README.md @@ -1,4 +1,156 @@ +# LLMCompass backend_app +This directory contains a minimal backend HTTP service (FastAPI + simulation scheduler). +This README explains how to build and run the backend (Docker-only), how to call the API, +and how to extend the codebase with new synchronous simulators. + +## Prerequisites +- Docker (required runtime) +- Python 3.8+ (for development/testing inside the image) + +## Docker (build & run) + +The backend is supported to run only via Docker. Build the image from the repository root: + +```bash sudo docker build -t llmcompass-backend . +``` + +Run an interactive container with the repository mounted to `/app` (recommended for development): + +```bash +sudo docker run --rm -it -w /app --name llmcompass -v "$PWD":/app llmcompass-backend /bin/bash +``` + +## Environment variables +- `API_PORT` — port used by tests/uvicorn inside the container (default: 8000) +- `API_URL` — if set in tests, the test suite will target this external URL instead of starting a local server +- `ARTIFACT_DIR` — directory where tests write artifacts (default: `artifacts/`) + +## HTTP API endpoints +- `GET /health` — health check, returns `{status: "ok"}` +- `GET /supported_ops` — list of supported operations (e.g. `matmul`, `gelu`) +- `POST /tasks` — submit a simulation task (returns `task_id`) +- `GET /tasks/{task_id}` — query task status and result + +Example task payload (matmul): + +```json +{ + "kernel_name": "itest_matmul", + "op": "matmul", + "input_dim": [[1, 2048], [2048, 7168]], + "dtype": ["c10::BFloat16", "c10::BFloat16"], + "system_key": "A100_4_fp16" +} +``` + +Example finished task (matmul): +```json +{ + "status_code": 200, + "body": { + "task_id": "089d0b13-2ef9-43e1-bde3-44ed7219e959", + "status": "done", + "result": { + "kernel_name": "itest_matmul_M_1", + "status": "success", + "output": { + "summary": "matmul simulated" + }, + "time_taken": 1.4408317802844531e-05, + "metadata": { + "kernel_name": "itest_matmul_M_1", + "input_dim": [ + [ + 1, + 2048 + ], + [ + 2048, + 7168 + ] + ], + "dtype": [ + "c10::BFloat16", + "c10::BFloat16" + ] + } + }, + "payload": { + "kernel_name": "itest_matmul_M_1", + "op": "matmul", + "input_dim": [ + [ + 1, + 2048 + ], + [ + 2048, + 7168 + ] + ], + "dtype": [ + "c10::BFloat16", + "c10::BFloat16" + ], + "system_key": "A100_4_fp16" + }, + "created_at": "2025-09-17T02:23:11.777675", + "updated_at": "2025-09-17T02:23:11.778457" + } +} +``` + + +Task states may include `queued`, `running`, `done`, `failed` (scheduler/worker dependent). + +## Code layout and runtime flow + +Key modules: +- `backend_app/simulator.py` — async entry points and dispatcher (`simulate_kernel_trace`, `process_kernel_simulation_task`). +- `backend_app/sim_utils.py` — shared helpers: dtype mapping, tensor construction, unified failure response helper `_make_failure`. +- `backend_app/sync_simulators.py` — synchronous `_simulate_*` implementations (e.g. `_simulate_matmul_sync`) and `_select_sync_simulator`. + +Runtime flow (simplified): +1. Worker receives a task, constructs `kernel_task` dict and calls `process_kernel_simulation_task`. +2. `process_kernel_simulation_task` calls `simulate_kernel_trace` (async). +3. `simulate_kernel_trace` selects a synchronous implementation via `_select_sync_simulator` and runs it in a thread using `asyncio.to_thread` to avoid blocking the event loop. +4. Synchronous implementations perform compile/simulate and return a standardized dict: `{status, output, time_taken, metadata}`. + +All failure responses are created via `_make_failure(kernel_name, error, error_code)` to keep format consistent. + +## Adding a new synchronous simulator + +1. Implement a new `_simulate__sync` function in `backend_app/sync_simulators.py` with the same signature as existing ones: + +```py +def _simulate_conv_sync(kernel_name, input_dim, dtype_str, system_key=None): + # use backend_app.sim_utils helpers: _map_dtype, _make_tensor, _make_failure + ... + return {"status": "success", ...} +``` + +2. Update `_select_sync_simulator` in the same file to return your function when appropriate (e.g. `if "conv" in kn:`). + +3. Optionally add the op keyword to `get_supported_ops()` in `backend_app/simulator.py`. + +4. Add unit and/or integration tests to cover happy-path and failure cases. + +5. If new dependencies are required, update `requirements.txt` / `pyproject.toml` and Dockerfile. + +Important: keep the synchronous implementation's return schema consistent so the async wrapper can handle it uniformly. + +## Error codes + +Error codes are currently plain strings (e.g. `INVALID_INPUT`, `NO_SYSTEM`, `SIMULATOR_ERROR`). + +## Tests + +Run tests from the repository root inside the Docker container or a development image: + +```bash +pytest tests/ +``` -sudo docker run --rm -it -w /app --name llmcompass llmcompass-backend /bin/bash \ No newline at end of file +Integration tests write artifacts to the directory specified by `ARTIFACT_DIR` to aid debugging. \ No newline at end of file diff --git a/backend_app/main.py b/backend_app/main.py index 450cff7..d1b9c3d 100644 --- a/backend_app/main.py +++ b/backend_app/main.py @@ -7,7 +7,7 @@ from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException from pydantic import BaseModel -from backend_app.simulator import simulate_kernel_trace, process_kernel_simulation_task +from backend_app.scheduler import simulate_kernel_trace, process_kernel_simulation_task @asynccontextmanager @@ -18,7 +18,9 @@ async def lifespan(app: FastAPI): # create queue and start background worker app.state.queue = asyncio.Queue() - app.state.worker_task = asyncio.create_task(worker_loop(app.state.queue, app.state.tasks, app.state.tasks_lock)) + app.state.worker_task = asyncio.create_task( + worker_loop(app.state.queue, app.state.tasks, app.state.tasks_lock) + ) try: yield @@ -35,6 +37,7 @@ async def lifespan(app: FastAPI): app = FastAPI(title="LLMCompass Kernel Simulator", lifespan=lifespan) + class KernelTask(BaseModel): kernel_name: str op: str @@ -62,13 +65,17 @@ async def worker_loop(queue: asyncio.Queue, tasks: dict, lock: asyncio.Lock): if task_id in tasks: tasks[task_id]["status"] = "done" tasks[task_id]["result"] = result - tasks[task_id]["updated_at"] = datetime.datetime.utcnow().isoformat() + tasks[task_id][ + "updated_at" + ] = datetime.datetime.utcnow().isoformat() except Exception as e: async with lock: if task_id in tasks: tasks[task_id]["status"] = "failed" tasks[task_id]["result"] = {"error": str(e)} - tasks[task_id]["updated_at"] = datetime.datetime.utcnow().isoformat() + tasks[task_id][ + "updated_at" + ] = datetime.datetime.utcnow().isoformat() finally: queue.task_done() @@ -101,17 +108,25 @@ async def create_task(t: KernelTask, wait: bool = False, timeout: float = 30.0): # synchronous path: process inline with timeout try: - result = await asyncio.wait_for(process_kernel_simulation_task(payload), timeout=timeout) + result = await asyncio.wait_for( + process_kernel_simulation_task(payload), timeout=timeout + ) except asyncio.TimeoutError: # leave as queued for background worker to pick up later - return {"task_id": task_id, "status": "timeout", "message": f"processing did not finish within {timeout}s"} + return { + "task_id": task_id, + "status": "timeout", + "message": f"processing did not finish within {timeout}s", + } except Exception as e: # update in-memory store as failed async with app.state.tasks_lock: if task_id in app.state.tasks: app.state.tasks[task_id]["status"] = "failed" app.state.tasks[task_id]["result"] = {"error": str(e)} - app.state.tasks[task_id]["updated_at"] = datetime.datetime.utcnow().isoformat() + app.state.tasks[task_id][ + "updated_at" + ] = datetime.datetime.utcnow().isoformat() raise HTTPException(status_code=500, detail=str(e)) # write result into in-memory store and return @@ -119,14 +134,16 @@ async def create_task(t: KernelTask, wait: bool = False, timeout: float = 30.0): if task_id in app.state.tasks: app.state.tasks[task_id]["status"] = "done" app.state.tasks[task_id]["result"] = result - app.state.tasks[task_id]["updated_at"] = datetime.datetime.utcnow().isoformat() + app.state.tasks[task_id][ + "updated_at" + ] = datetime.datetime.utcnow().isoformat() return {"task_id": task_id, "status": "done", "result": result} @app.get("/supported_ops") async def supported_ops(): - from backend_app.simulator import get_supported_ops + from backend_app.sim_utils import get_supported_ops return {"supported_ops": get_supported_ops()} diff --git a/backend_app/scheduler.py b/backend_app/scheduler.py new file mode 100644 index 0000000..26d280a --- /dev/null +++ b/backend_app/scheduler.py @@ -0,0 +1,62 @@ +import asyncio +import time +from typing import Any, Dict + +# top-level async dispatcher imports helpers and sync simulators from smaller modules +from backend_app.sim_utils import _make_failure +from backend_app.sync_simulators import _select_sync_simulator + + +async def simulate_kernel_trace( + kernel_name: str, op: str, input_dim: list[list], dtype: list[str], system_key: str +) -> Dict[str, Any]: + """ + Dispatch to real software_model simulations. Blocking compile_and_simulate calls are executed + in a thread via asyncio.to_thread so the event loop is not blocked. + Returns standardized result: {status, output, time_taken, metadata} + """ + # prefer op if provided (more specific), else fall back to kernel_name + selector = op if op else kernel_name + simulator = _select_sync_simulator(selector) + # run simulator in thread; pass system_key when simulator expects it + result = await asyncio.to_thread( + simulator, kernel_name, input_dim, dtype, system_key + ) + + # simulator returns dict; validate and propagate structured failure + if not isinstance(result, dict): + return _make_failure( + kernel_name, "simulator returned invalid result type", "SIMULATOR_ERROR" + ) + # ensure failure results have error_code when possible + if result.get("status") == "failed": + md = result.setdefault("metadata", {}) + md.setdefault("error_code", md.get("error_code", "SIMULATOR_FAILED")) + return result + + +async def process_kernel_simulation_task(kernel_task: Dict[str, Any]) -> Dict[str, Any]: + """ + Public entrypoint used by the worker. Calls simulate_kernel_trace and normalizes output. + """ + kernel_name = kernel_task.get("kernel_name", "") + op = kernel_task.get("op", "") + input_dim = kernel_task.get("input_dim", []) + dtype = kernel_task.get("dtype", []) + system_key = kernel_task.get("system_key") + start = time.time() + res = await simulate_kernel_trace( + kernel_name, op, input_dim, dtype, system_key=system_key + ) + end = time.time() + # normalize to expected schema + out = { + "kernel_name": kernel_name, + "status": res.get("status", "failed"), + "output": res.get("output"), + "time_taken": res.get("time_taken", end - start), + "metadata": res.get( + "metadata", {"op": op, "input_dim": input_dim, "dtype": dtype} + ), + } + return out diff --git a/backend_app/sim_utils.py b/backend_app/sim_utils.py new file mode 100644 index 0000000..389537b --- /dev/null +++ b/backend_app/sim_utils.py @@ -0,0 +1,44 @@ +from typing import Any + +# import minimal software/hardware helpers used by simulators +from software_model.utils import Tensor, data_type_dict + + +def _map_dtype(dtype_str: str): + s = dtype_str.lower() + if "fp16" in s or "float16" in s: + return data_type_dict.get("fp16") + else: + return None + + +def _make_tensor(shape, dtype_obj): + try: + return Tensor(list(shape), dtype_obj) + except Exception: + return None + + +# centralized failure helper to avoid repeated dict literals +def _make_failure(kernel_name: str, error: str, error_code: str): + return { + "status": "failed", + "output": None, + "time_taken": None, + "metadata": { + "kernel_name": kernel_name, + "error": error, + "error_code": error_code, + }, + } + + +def _make_missing_system(kernel_name: str, system_key: str): + return _make_failure( + kernel_name, f"missing system configuration '{system_key}'", "NO_SYSTEM" + ) + + +def get_supported_ops() -> list: + """Return list of supported op keywords for routing/diagnostics.""" + return ["gelu", "layernorm", "matmul", "softmax"] diff --git a/backend_app/simulator.py b/backend_app/simulator.py deleted file mode 100644 index 84a5cfe..0000000 --- a/backend_app/simulator.py +++ /dev/null @@ -1,163 +0,0 @@ -import asyncio -import time -import re -from typing import Any, Dict - -# import real software models and hardware/system descriptions -from software_model.utils import Tensor, data_type_dict -from software_model.matmul import Matmul, BatchedMatmul -from software_model.softmax import Softmax -from software_model.layernorm import LayerNorm -from software_model.gelu import GeLU -from software_model.operators import Operator -from hardware_model.system import system_dict - - -def _map_dtype(dtype_str: str): - s = dtype_str.lower() - if "fp16" in s or "float16" in s: - return data_type_dict.get("fp16") - else: - return None - -def _make_tensor(shape, dtype_obj): - try: - return Tensor(list(shape), dtype_obj) - except Exception: - return None - - -def _simulate_matmul_sync(kernel_name: str, input_dim: list[list], dtype_str: list[str], system_key: str = None) -> Dict[str, Any]: - # normalize dtype input - dt = _map_dtype(dtype_str[0]) - A_shape = input_dim[0] - B_shape = input_dim[1] - - if dt is None or A_shape is None or B_shape is None: - return { - "status": "failed", - "output": None, - "time_taken": None, - "metadata": { - "kernel_name": kernel_name, - "error": "invalid dtype or input_dim - cannot simulate matmul", - "error_code": "INVALID_INPUT", - "hint": "Check dtype and input_dim in the request", - }, - } - - op = Matmul(dt) - A = _make_tensor(A_shape, dt) - B = _make_tensor(B_shape, dt) - _ = op(A, B) - - # require valid system_key and resolve device safely - if not system_key: - return { - "status": "failed", - "output": None, - "time_taken": None, - "metadata": { - "kernel_name": kernel_name, - "error": "no system_key provided - cannot simulate matmul", - "error_code": "NO_SYSTEM", - "hint": "Provide a valid system_key in the request", - }, - } - system = system_dict.get(system_key) - if system is None: - return { - "status": "failed", - "output": None, - "time_taken": None, - "metadata": { - "kernel_name": kernel_name, - "error": f"missing system configuration '{system_key}' - cannot simulate matmul", - "error_code": "NO_SYSTEM", - "hint": "Use a key from hardware_model.system.system_dict", - }, - } - - device = system.device - latency = op.compile_and_simulate(device, compile_mode="heuristic-GPU") - return { - "status": "success", - "output": {"summary": "matmul simulated"}, - "time_taken": float(latency), - "metadata": {"kernel_name": kernel_name, "input_dim": input_dim, "dtype": dtype_str}, - } - -def _simulate_fail(kernel_name: str, _input_dim=None, _dtype_str: str = "") -> Dict[str, Any]: - # generic fallback is not allowed per policy; return explicit failure - return { - "status": "failed", - "output": None, - "time_taken": None, - "metadata": { - "kernel_name": kernel_name, - "error": "unsupported op - no generic simulator available", - "error_code": "UNSUPPORTED_OP", - "hint": "Implement a simulator for this kernel in software_model or submit with a supported op" - }, - } - - -def _select_sync_simulator(kernel_name: str): - if not kernel_name: - return _simulate_fail - kn = kernel_name.lower() - if "matmul" in kn: - return _simulate_matmul_sync - # conv and other ops are not supported unless explicitly implemented - return _simulate_fail - - -async def simulate_kernel_trace(kernel_name: str, op: str, input_dim: list[list], dtype: list[str], system_key: str) -> Dict[str, Any]: - """ - Dispatch to real software_model simulations. Blocking compile_and_simulate calls are executed - in a thread via asyncio.to_thread so the event loop is not blocked. - Returns standardized result: {status, output, time_taken, metadata} - """ - # prefer op if provided (more specific), else fall back to kernel_name - selector = op if op else kernel_name - simulator = _select_sync_simulator(selector) - # run simulator in thread; pass system_key when simulator expects it - result = await asyncio.to_thread(simulator, kernel_name, input_dim, dtype, system_key) - - # simulator returns dict; validate and propagate structured failure - if not isinstance(result, dict): - return {"status": "failed", "output": None, "time_taken": None, "metadata": {"kernel_name": kernel_name, "error": "simulator returned invalid result type", "error_code": "SIMULATOR_ERROR", "hint": "Check simulator implementation"}} - # ensure failure results have error_code/hint when possible - if result.get("status") == "failed": - md = result.setdefault("metadata", {}) - md.setdefault("error_code", md.get("error_code", "SIMULATOR_FAILED")) - md.setdefault("hint", md.get("hint", "See simulator logs for details")) - return result - - -def get_supported_ops() -> list: - """Return list of supported op keywords for routing/diagnostics.""" - return ["gelu", "layernorm", "matmul", "softmax"] - - -async def process_kernel_simulation_task(kernel_task: Dict[str, Any]) -> Dict[str, Any]: - """ - Public entrypoint used by the worker. Calls simulate_kernel_trace and normalizes output. - """ - kernel_name = kernel_task.get("kernel_name", "") - op = kernel_task.get("op", "") - input_dim = kernel_task.get("input_dim", []) - dtype = kernel_task.get("dtype", []) - system_key = kernel_task.get("system_key") - start = time.time() - res = await simulate_kernel_trace(kernel_name, op, input_dim, dtype, system_key=system_key) - end = time.time() - # normalize to expected schema - out = { - "kernel_name": kernel_name, - "status": res.get("status", "failed"), - "output": res.get("output"), - "time_taken": res.get("time_taken", end - start), - "metadata": res.get("metadata", {"op": op, "input_dim": input_dim, "dtype": dtype}), - } - return out diff --git a/backend_app/sync_simulators.py b/backend_app/sync_simulators.py new file mode 100644 index 0000000..a23416e --- /dev/null +++ b/backend_app/sync_simulators.py @@ -0,0 +1,243 @@ +from typing import Any, Dict + +from backend_app.sim_utils import ( + _map_dtype, + _make_tensor, + _make_failure, + _make_missing_system, +) +from software_model.matmul import Matmul, BatchedMatmul +from software_model.softmax import Softmax +from software_model.layernorm import LayerNorm +from software_model.gelu import GeLU +from hardware_model.system import system_dict + + +def _simulate_matmul_sync( + kernel_name: str, + input_dim: list[list], + dtype_str: list[str], + system_key: str = None, +) -> Dict[str, Any]: + dt = _map_dtype(dtype_str[0]) + A_shape = input_dim[0] + B_shape = input_dim[1] + + if dt is None: + return _make_failure( + kernel_name, "invalid or unsupported dtype", "INVALID_INPUT" + ) + elif A_shape is None or B_shape is None: + return _make_failure(kernel_name, "invalid input dimension", "INVALID_INPUT") + + op = Matmul(dt) + A = _make_tensor(A_shape, dt) + B = _make_tensor(B_shape, dt) + _ = op(A, B) + + if not system_key: + return _make_failure(kernel_name, "no valid system_key provided", "NO_SYSTEM") + system = system_dict.get(system_key) + if system is None: + return _make_missing_system(kernel_name, system_key) + + device = system.device + latency = op.compile_and_simulate(device, compile_mode="heuristic-GPU") + return { + "status": "success", + "output": {"summary": "matmul simulated"}, + "time_taken": float(latency), + "metadata": { + "kernel_name": kernel_name, + "input_dim": input_dim, + "dtype": dtype_str, + }, + } + + +def _simulate_bmm_sync( + kernel_name: str, + input_dim: list[list], + dtype_str: list[str], + system_key: str = None, +) -> Dict[str, Any]: + dt = _map_dtype(dtype_str[0]) + A_shape = input_dim[0] + B_shape = input_dim[1] + + if dt is None: + return _make_failure( + kernel_name, "invalid or unsupported dtype", "INVALID_INPUT" + ) + elif A_shape is None or B_shape is None: + return _make_failure(kernel_name, "invalid input dimension", "INVALID_INPUT") + + op = BatchedMatmul(dt) + A = _make_tensor(A_shape, dt) + B = _make_tensor(B_shape, dt) + _ = op(A, B) + + if not system_key: + return _make_failure(kernel_name, "no valid system_key provided", "NO_SYSTEM") + system = system_dict.get(system_key) + if system is None: + return _make_missing_system(kernel_name, system_key) + + device = system.device + latency = op.compile_and_simulate(device, compile_mode="heuristic-GPU") + return { + "status": "success", + "output": {"summary": "matmul simulated"}, + "time_taken": float(latency), + "metadata": { + "kernel_name": kernel_name, + "input_dim": input_dim, + "dtype": dtype_str, + }, + } + + +def _simulate_layernorm_sync( + kernel_name: str, + input_dim: list, + dtype_str: str, + system_key: str = None, +) -> Dict[str, Any]: + dt = _map_dtype(dtype_str) + A_shape = input_dim + + if dt is None: + return _make_failure( + kernel_name, "invalid or unsupported dtype", "INVALID_INPUT" + ) + elif A_shape is None: + return _make_failure(kernel_name, "invalid input dimension", "INVALID_INPUT") + + op = LayerNorm(dt) + A = _make_tensor(A_shape, dt) + _ = op(A) + + if not system_key: + return _make_failure(kernel_name, "no valid system_key provided", "NO_SYSTEM") + system = system_dict.get(system_key) + if system is None: + return _make_missing_system(kernel_name, system_key) + + device = system.device + latency = op.compile_and_simulate(device, compile_mode="heuristic-GPU") + return { + "status": "success", + "output": {"summary": "LayerNorm simulated"}, + "time_taken": float(latency), + "metadata": { + "kernel_name": kernel_name, + "input_dim": input_dim, + "dtype": dtype_str, + }, + } + + +def _simulate_gelu_sync( + kernel_name: str, + input_dim: list, + dtype_str: str, + system_key: str = None, +) -> Dict[str, Any]: + dt = _map_dtype(dtype_str) + A_shape = input_dim + + if dt is None: + return _make_failure( + kernel_name, "invalid or unsupported dtype", "INVALID_INPUT" + ) + elif A_shape is None: + return _make_failure(kernel_name, "invalid input dimension", "INVALID_INPUT") + + op = GeLu(dt) + A = _make_tensor(A_shape, dt) + _ = op(A) + + if not system_key: + return _make_failure(kernel_name, "no valid system_key provided", "NO_SYSTEM") + system = system_dict.get(system_key) + if system is None: + return _make_missing_system(kernel_name, system_key) + + device = system.device + latency = op.compile_and_simulate(device, compile_mode="heuristic-GPU") + return { + "status": "success", + "output": {"summary": "LayerNorm simulated"}, + "time_taken": float(latency), + "metadata": { + "kernel_name": kernel_name, + "input_dim": input_dim, + "dtype": dtype_str, + }, + } + + +def _simulate_softmax_sync( + kernel_name: str, + input_dim: list, + dtype_str: str, + system_key: str = None, +) -> Dict[str, Any]: + dt = _map_dtype(dtype_str) + A_shape = input_dim + + if dt is None: + return _make_failure( + kernel_name, "invalid or unsupported dtype", "INVALID_INPUT" + ) + elif A_shape is None: + return _make_failure(kernel_name, "invalid input dimension", "INVALID_INPUT") + + op = Softmax(dt) + A = _make_tensor(A_shape, dt) + _ = op(A) + + if not system_key: + return _make_failure(kernel_name, "no valid system_key provided", "NO_SYSTEM") + system = system_dict.get(system_key) + if system is None: + return _make_missing_system(kernel_name, system_key) + + device = system.device + latency = op.compile_and_simulate(device, compile_mode="heuristic-GPU") + return { + "status": "success", + "output": {"summary": "LayerNorm simulated"}, + "time_taken": float(latency), + "metadata": { + "kernel_name": kernel_name, + "input_dim": input_dim, + "dtype": dtype_str, + }, + } + + +def _simulate_fail( + kernel_name: str, _input_dim=None, _dtype_str: str = "" +) -> Dict[str, Any]: + return _make_failure( + kernel_name, "unsupported op - no generic simulator available", "UNSUPPORTED_OP" + ) + + +def _select_sync_simulator(kernel_name: str): + if not kernel_name: + return _simulate_fail + kn = kernel_name.lower() + if "matmul" in kn: + return _simulate_matmul_sync + elif "bmm" in kn: + return _simulate_bmm_sync + elif "layernorm" in kn: + return _simulate_layernorm_sync + elif "gelu" in kn: + return _simulate_gelu_sync + elif "softmax" in kn: + return _simulate_softmax_sync + # conv and other ops are not supported unless explicitly implemented + return _simulate_fail diff --git a/tests/test_api_integration.py b/tests/test_api_integration.py index 8a584cf..7e29e65 100644 --- a/tests/test_api_integration.py +++ b/tests/test_api_integration.py @@ -35,7 +35,16 @@ def server(): yield return - cmd = [sys.executable, "-m", "uvicorn", "backend_app.main:app", "--host", SERVER_HOST, "--port", str(SERVER_PORT)] + cmd = [ + sys.executable, + "-m", + "uvicorn", + "backend_app.main:app", + "--host", + SERVER_HOST, + "--port", + str(SERVER_PORT), + ] # redirect uvicorn output to artifact files out_path = ARTIFACT_DIR / "uvicorn.out" @@ -65,7 +74,9 @@ def server(): err = b"(no stderr available)" fout.close() ferr.close() - raise RuntimeError(f"uvicorn failed to start in time. stderr:\n{err.decode(errors='ignore')}") + raise RuntimeError( + f"uvicorn failed to start in time. stderr:\n{err.decode(errors='ignore')}" + ) try: yield @@ -85,7 +96,6 @@ def _url(path: str) -> str: return BASE.rstrip("/") + path -@pytest.mark.integration def test_health(): r = requests.get(_url("/health"), timeout=5) with open(ARTIFACT_DIR / "health.json", "w") as f: @@ -94,7 +104,6 @@ def test_health(): assert r.json().get("status") == "ok" -@pytest.mark.integration def test_supported_ops(): r = requests.get(_url("/supported_ops"), timeout=5) with open(ARTIFACT_DIR / "supported_ops.json", "w") as f: @@ -103,18 +112,41 @@ def test_supported_ops(): j = r.json() assert isinstance(j.get("supported_ops"), list) -@pytest.mark.integration -def test_create_task_and_poll_matmul(): - """Submit a matmul task and poll for completion; save artifacts for debugging.""" - payload = {"kernel_name": "itest_matmul", - "op": "matmul", - "input_dim": [[1, 2048], [2048, 7168]], - "dtype": ['c10::BFloat16', 'c10::BFloat16'], - "system_key": "A100_4_fp16"} +@pytest.mark.parametrize( + "matmul_payload", + [ + { + "kernel_name": "itest_matmul_M_1", + "op": "matmul", + "input_dim": [[1, 2048], [2048, 7168]], + "dtype": ["c10::BFloat16", "c10::BFloat16"], + "system_key": "A100_4_fp16", + }, + { + "kernel_name": "itest_matmul_M_128", + "op": "matmul", + "input_dim": [[128, 128], [128, 128]], + "dtype": ["c10::BFloat16", "c10::BFloat16"], + "system_key": "A100_4_fp16", + }, + { + "kernel_name": "itest_matmul_fp8_unsupported", + "op": "matmul", + "input_dim": [[128, 128], [128, 128]], + "dtype": ["c10::Float8_e4m3fn", "c10::Float8_e4m3fn"], + "system_key": "A100_4_fp16", + }, + ], +) +def test_create_task_and_poll_matmul(matmul_payload): + """Submit a matmul task and poll for completion; save artifacts for debugging.""" + payload = matmul_payload with open(ARTIFACT_DIR / "matmul_create_task_request.json", "w") as f: - json.dump({"url": _url("/tasks"), "payload": payload, "op": "matmul"}, f, indent=2) + json.dump( + {"url": _url("/tasks"), "payload": payload, "op": "matmul"}, f, indent=2 + ) r = requests.post(_url("/tasks"), json=payload, timeout=5) with open(ARTIFACT_DIR / "matmul_create_task_response.json", "w") as f: @@ -141,8 +173,257 @@ def test_create_task_and_poll_matmul(): json.dump({"status_code": r.status_code, "body": info}, f, indent=2) if status == "done": assert "result" in info + simulated_time = info.get("result", {}).get("time_taken") + if payload.get("kernel_name") == "itest_matmul_fp8_unsupported": + # this kernel is expected to be unsupported + assert ( + info.get("result", {}).get("metadata").get("error_code") + ), "INVALID_INPUT" + elif payload.get("kernel_name") == "itest_matmul_M_1": + # this kernel is expected to be very fast + assert ( + simulated_time == 1.4408317802844531e-05 + ), f"simulated_time={simulated_time}" + elif payload.get("kernel_name") == "itest_matmul_M_128": + # this kernel is expected to be fast + assert ( + simulated_time == 1.1276595744680851e-07 + ), f"simulated_time={simulated_time}" break last = status time.sleep(1) assert last in ("done", "failed", "queued", None) + + +@pytest.mark.parametrize( + "bmm_payload", + [ + { + "kernel_name": "itest_bmm_M_1", + "op": "bmm", + # batch=1, m=1, k=2048 ; batch=1, k=2048, n=7168 + "input_dim": [[1, 1, 2048], [1, 2048, 7168]], + "dtype": ["c10::BFloat16", "c10::BFloat16"], + "system_key": "A100_4_fp16", + }, + { + "kernel_name": "itest_bmm_fp8_unsupported", + "op": "bmm", + "input_dim": [[128, 128, 128], [128, 128, 128]], + "dtype": ["c10::Float8_e4m3fn", "c10::Float8_e4m3fn"], + "system_key": "A100_4_fp16", + }, + ], +) +def test_create_task_and_poll_bmm(bmm_payload): + """Submit a batched-matmul (bmm) task and poll for completion; save artifacts for debugging.""" + payload = bmm_payload + + with open(ARTIFACT_DIR / "bmm_create_task_request.json", "w") as f: + json.dump({"url": _url("/tasks"), "payload": payload, "op": "bmm"}, f, indent=2) + + r = requests.post(_url("/tasks"), json=payload, timeout=5) + with open(ARTIFACT_DIR / "bmm_create_task_response.json", "w") as f: + try: + body = r.json() + except Exception: + body = {"text": r.text} + json.dump({"status_code": r.status_code, "body": body}, f, indent=2) + + assert r.status_code == 200 + j = r.json() + task_id = j.get("task_id") + assert task_id + + # poll briefly for terminal status + deadline = time.time() + 20 + last = None + while time.time() < deadline: + r = requests.get(_url(f"/tasks/{task_id}"), timeout=5) + if r.status_code == 200: + info = r.json() + status = info.get("status") + with open(ARTIFACT_DIR / f"task_{task_id}_poll_bmm.json", "w") as f: + json.dump({"status_code": r.status_code, "body": info}, f, indent=2) + if status == "done": + assert "result" in info + simulated_time = info.get("result", {}).get("time_taken") + if payload.get("kernel_name") == "itest_bmm_fp8_unsupported": + # this kernel is expected to be unsupported + assert ( + info.get("result", {}).get("metadata", {}).get("error_code") + == "INVALID_INPUT" + ) + else: + # for supported kernels we expect a numeric simulation time + assert simulated_time is not None + assert isinstance(simulated_time, (int, float)) + break + last = status + time.sleep(1) + + assert last in ("done", "failed", "queued", None) + + +@pytest.mark.parametrize( + "payload", + [ + { + "kernel_name": "itest_gelu_default", + "op": "gelu", + "input_dim": [1024], + "dtype": "fp16", + "system_key": "A100_4_fp16", + }, + ], +) +def test_create_task_and_poll_gelu(payload): + """Template test for gelu: submit and poll for completion.""" + with open(ARTIFACT_DIR / "gelu_create_task_request.json", "w") as f: + json.dump( + {"url": _url("/tasks"), "payload": payload, "op": "gelu"}, f, indent=2 + ) + + r = requests.post(_url("/tasks"), json=payload, timeout=5) + with open(ARTIFACT_DIR / "gelu_create_task_response.json", "w") as f: + try: + body = r.json() + except Exception: + body = {"text": r.text} + json.dump({"status_code": r.status_code, "body": body}, f, indent=2) + + assert r.status_code == 200 + j = r.json() + task_id = j.get("task_id") + assert task_id + + deadline = time.time() + 20 + last = None + while time.time() < deadline: + r = requests.get(_url(f"/tasks/{task_id}"), timeout=5) + if r.status_code == 200: + info = r.json() + status = info.get("status") + with open(ARTIFACT_DIR / f"task_{task_id}_poll_gelu.json", "w") as f: + json.dump({"status_code": r.status_code, "body": info}, f, indent=2) + if status == "done": + assert "result" in info + simulated_time = info.get("result", {}).get("time_taken") + assert simulated_time is not None + break + last = status + time.sleep(1) + assert last in ("done", "failed", "queued", None) + + +@pytest.mark.parametrize( + "payload", + [ + { + "kernel_name": "itest_layernorm_default", + "op": "layernorm", + "input_dim": [1, 1024, 7168], + "dtype": "fp16", + "system_key": "A100_4_fp16", + }, + { + "kernel_name": "itest_layernorm_unsupported", + "op": "layernorm", + "input_dim": [1, 1024, 7168], + "dtype": "fp8", + "system_key": "A100_4_fp16", + }, + ], +) +def test_create_task_and_poll_layernorm(payload): + """Template test for layernorm: submit and poll for completion.""" + with open(ARTIFACT_DIR / "layernorm_create_task_request.json", "w") as f: + json.dump( + {"url": _url("/tasks"), "payload": payload, "op": "layernorm"}, f, indent=2 + ) + + r = requests.post(_url("/tasks"), json=payload, timeout=5) + with open(ARTIFACT_DIR / "layernorm_create_task_response.json", "w") as f: + try: + body = r.json() + except Exception: + body = {"text": r.text} + json.dump({"status_code": r.status_code, "body": body}, f, indent=2) + + assert r.status_code == 200 + j = r.json() + task_id = j.get("task_id") + assert task_id + + deadline = time.time() + 20 + last = None + while time.time() < deadline: + r = requests.get(_url(f"/tasks/{task_id}"), timeout=5) + if r.status_code == 200: + info = r.json() + status = info.get("status") + with open(ARTIFACT_DIR / f"task_{task_id}_poll_layernorm.json", "w") as f: + json.dump({"status_code": r.status_code, "body": info}, f, indent=2) + if status == "done": + assert "result" in info + simulated_time = info.get("result", {}).get("time_taken") + if payload.get("kernel_name") == "itest_layernorm_default": + assert simulated_time is not None + elif payload.get("kernel_name") == "itest_layernorm_unsupported": + assert simulated_time is None + break + last = status + time.sleep(1) + assert last in ("done", "failed", "queued", None) + + +@pytest.mark.parametrize( + "payload", + [ + { + "kernel_name": "itest_softmax_default", + "op": "softmax", + "input_dim": [64, 128], + "dtype": "fp16", + "system_key": "A100_4_fp16", + }, + ], +) +def test_create_task_and_poll_softmax(payload): + """Template test for softmax: submit and poll for completion.""" + with open(ARTIFACT_DIR / "softmax_create_task_request.json", "w") as f: + json.dump( + {"url": _url("/tasks"), "payload": payload, "op": "softmax"}, f, indent=2 + ) + + r = requests.post(_url("/tasks"), json=payload, timeout=5) + with open(ARTIFACT_DIR / "softmax_create_task_response.json", "w") as f: + try: + body = r.json() + except Exception: + body = {"text": r.text} + json.dump({"status_code": r.status_code, "body": body}, f, indent=2) + + assert r.status_code == 200 + j = r.json() + task_id = j.get("task_id") + assert task_id + + deadline = time.time() + 20 + last = None + while time.time() < deadline: + r = requests.get(_url(f"/tasks/{task_id}"), timeout=5) + if r.status_code == 200: + info = r.json() + status = info.get("status") + with open(ARTIFACT_DIR / f"task_{task_id}_poll_softmax.json", "w") as f: + json.dump({"status_code": r.status_code, "body": info}, f, indent=2) + if status == "done": + assert "result" in info + simulated_time = info.get("result", {}).get("time_taken") + assert simulated_time is not None + break + last = status + time.sleep(1) + assert last in ("done", "failed", "queued", None) From 9915fabec05e557d47fc387701cc34b10483d6ce Mon Sep 17 00:00:00 2001 From: zhangt Date: Tue, 16 Sep 2025 20:32:27 -0700 Subject: [PATCH 03/15] Update readme --- backend_app/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend_app/README.md b/backend_app/README.md index 52ed3db..cfae5ca 100644 --- a/backend_app/README.md +++ b/backend_app/README.md @@ -16,10 +16,10 @@ The backend is supported to run only via Docker. Build the image from the reposi sudo docker build -t llmcompass-backend . ``` -Run an interactive container with the repository mounted to `/app` (recommended for development): +Run the docker container, which exposes 8000 to host for backend interaction: ```bash -sudo docker run --rm -it -w /app --name llmcompass -v "$PWD":/app llmcompass-backend /bin/bash +sudo docker run --rm -p 8000:8000 llmcompass-backend ``` ## Environment variables From 081d905e015f5abec89e11485c45fa911d737f3e Mon Sep 17 00:00:00 2001 From: zhangt Date: Tue, 16 Sep 2025 21:04:34 -0700 Subject: [PATCH 04/15] Loose input dim format to Any --- backend_app/main.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/backend_app/main.py b/backend_app/main.py index d1b9c3d..84e0e69 100644 --- a/backend_app/main.py +++ b/backend_app/main.py @@ -3,7 +3,7 @@ import json import datetime import os -from typing import Any, List, Union, Optional +from typing import List, Union, Optional, Any from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException from pydantic import BaseModel @@ -41,8 +41,7 @@ async def lifespan(app: FastAPI): class KernelTask(BaseModel): kernel_name: str op: str - # allow either a flat shape like [M,K,N] or a pair of shapes for A and B - input_dim: Optional[Union[List[int], List[List[int]]]] = None + input_dim: Optional[Any] = None # some clients send a single dtype string, others send a list of dtype strings dtype: Optional[Union[str, List[str]]] = "fp32" # optional system key From 5c2e9691e48f46bfb156e4e64d671b19c2b8fe76 Mon Sep 17 00:00:00 2001 From: zhangt Date: Tue, 16 Sep 2025 21:23:44 -0700 Subject: [PATCH 05/15] Fix bug and set default worker to 32 --- backend_app/main.py | 47 +++++++++++++++++++++++++++++----- backend_app/sync_simulators.py | 12 ++++----- 2 files changed, 46 insertions(+), 13 deletions(-) diff --git a/backend_app/main.py b/backend_app/main.py index 84e0e69..7a95360 100644 --- a/backend_app/main.py +++ b/backend_app/main.py @@ -16,19 +16,27 @@ async def lifespan(app: FastAPI): app.state.tasks = {} app.state.tasks_lock = asyncio.Lock() - # create queue and start background worker + # create queue and start background workers app.state.queue = asyncio.Queue() - app.state.worker_task = asyncio.create_task( - worker_loop(app.state.queue, app.state.tasks, app.state.tasks_lock) - ) + # number of concurrent background consumers (in-process). Use environment var WORKER_COUNT + try: + worker_count = int(os.environ.get("WORKER_COUNT", "32")) + except Exception: + worker_count = 1 + app.state.worker_tasks = [] + for i in range(max(1, worker_count)): + app.state.worker_tasks.append( + asyncio.create_task(worker_loop(app.state.queue, app.state.tasks, app.state.tasks_lock)) + ) try: yield finally: # shutdown: cancel background worker - worker = getattr(app.state, "worker_task", None) - if worker: + workers = getattr(app.state, "worker_tasks", None) or [] + for worker in workers: worker.cancel() + for worker in workers: try: await worker except asyncio.CancelledError: @@ -171,4 +179,29 @@ async def get_task(task_id: str): @app.get("/health") async def health(): - return {"status": "ok"} + # provide richer health info: queue size, worker tasks status, and task counts + queue = getattr(app.state, "queue", None) + workers = getattr(app.state, "worker_tasks", None) or [] + tasks_store = getattr(app.state, "tasks", None) or {} + + # summarize task states + counts = {"queued": 0, "running": 0, "done": 0, "failed": 0} + for entry in tasks_store.values(): + st = entry.get("status") + if st in counts: + counts[st] += 1 + + worker_info = [] + for w in workers: + try: + worker_info.append({"done": w.done(), "cancelled": w.cancelled()}) + except Exception: + worker_info.append({"done": None, "cancelled": None}) + + return { + "status": "ok", + "queue_length": queue.qsize() if queue is not None else None, + "worker_count": len(workers), + "workers": worker_info, + "task_counts": counts, + } diff --git a/backend_app/sync_simulators.py b/backend_app/sync_simulators.py index a23416e..20c4099 100644 --- a/backend_app/sync_simulators.py +++ b/backend_app/sync_simulators.py @@ -218,7 +218,7 @@ def _simulate_softmax_sync( def _simulate_fail( - kernel_name: str, _input_dim=None, _dtype_str: str = "" + kernel_name: str, _input_dim=None, _dtype_str: str = "", system_key: str = None, ) -> Dict[str, Any]: return _make_failure( kernel_name, "unsupported op - no generic simulator available", "UNSUPPORTED_OP" @@ -229,15 +229,15 @@ def _select_sync_simulator(kernel_name: str): if not kernel_name: return _simulate_fail kn = kernel_name.lower() - if "matmul" in kn: + if kn == "matmul": return _simulate_matmul_sync - elif "bmm" in kn: + elif kn == "bmm": return _simulate_bmm_sync - elif "layernorm" in kn: + elif kn == "layernorm" in kn: return _simulate_layernorm_sync - elif "gelu" in kn: + elif kn == "gelu": return _simulate_gelu_sync - elif "softmax" in kn: + elif kn == "softmax": return _simulate_softmax_sync # conv and other ops are not supported unless explicitly implemented return _simulate_fail From 3d76873399b46133d96e5c0897da625a77fba315 Mon Sep 17 00:00:00 2001 From: Terrence <39916879+TerrenceZhangX@users.noreply.github.com> Date: Wed, 17 Sep 2025 13:20:53 +0800 Subject: [PATCH 06/15] Update backend_app/sync_simulators.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- backend_app/sync_simulators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend_app/sync_simulators.py b/backend_app/sync_simulators.py index 20c4099..3cb4590 100644 --- a/backend_app/sync_simulators.py +++ b/backend_app/sync_simulators.py @@ -167,7 +167,7 @@ def _simulate_gelu_sync( latency = op.compile_and_simulate(device, compile_mode="heuristic-GPU") return { "status": "success", - "output": {"summary": "LayerNorm simulated"}, + "output": {"summary": "GeLU simulated"}, "time_taken": float(latency), "metadata": { "kernel_name": kernel_name, From 51b1f2a88aacef29280547c82ea208d1206e29f7 Mon Sep 17 00:00:00 2001 From: Terrence <39916879+TerrenceZhangX@users.noreply.github.com> Date: Wed, 17 Sep 2025 13:21:01 +0800 Subject: [PATCH 07/15] Update backend_app/sync_simulators.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- backend_app/sync_simulators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend_app/sync_simulators.py b/backend_app/sync_simulators.py index 3cb4590..86f23d8 100644 --- a/backend_app/sync_simulators.py +++ b/backend_app/sync_simulators.py @@ -207,7 +207,7 @@ def _simulate_softmax_sync( latency = op.compile_and_simulate(device, compile_mode="heuristic-GPU") return { "status": "success", - "output": {"summary": "LayerNorm simulated"}, + "output": {"summary": "Softmax simulated"}, "time_taken": float(latency), "metadata": { "kernel_name": kernel_name, From 5525d15e87d369e180d2440c3b0f41e25e458147 Mon Sep 17 00:00:00 2001 From: Terrence <39916879+TerrenceZhangX@users.noreply.github.com> Date: Wed, 17 Sep 2025 13:21:21 +0800 Subject: [PATCH 08/15] Update backend_app/README.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- backend_app/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend_app/README.md b/backend_app/README.md index cfae5ca..68c9be7 100644 --- a/backend_app/README.md +++ b/backend_app/README.md @@ -108,7 +108,7 @@ Task states may include `queued`, `running`, `done`, `failed` (scheduler/worker ## Code layout and runtime flow Key modules: -- `backend_app/simulator.py` — async entry points and dispatcher (`simulate_kernel_trace`, `process_kernel_simulation_task`). +- `backend_app/scheduler.py` — async entry points and dispatcher (`simulate_kernel_trace`, `process_kernel_simulation_task`). - `backend_app/sim_utils.py` — shared helpers: dtype mapping, tensor construction, unified failure response helper `_make_failure`. - `backend_app/sync_simulators.py` — synchronous `_simulate_*` implementations (e.g. `_simulate_matmul_sync`) and `_select_sync_simulator`. From 8a893f0c36e3aee24db92fec351bf03ad73e2f9f Mon Sep 17 00:00:00 2001 From: zhangt Date: Tue, 16 Sep 2025 22:24:02 -0700 Subject: [PATCH 09/15] update --- backend_app/main.py | 15 +++++++++++++-- backend_app/sync_simulators.py | 2 +- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/backend_app/main.py b/backend_app/main.py index 7a95360..5b1fa32 100644 --- a/backend_app/main.py +++ b/backend_app/main.py @@ -26,7 +26,7 @@ async def lifespan(app: FastAPI): app.state.worker_tasks = [] for i in range(max(1, worker_count)): app.state.worker_tasks.append( - asyncio.create_task(worker_loop(app.state.queue, app.state.tasks, app.state.tasks_lock)) + asyncio.create_task(worker_loop(app.state.queue, app.state.tasks, app.state.tasks_lock, worker_id=i)) ) try: @@ -56,7 +56,7 @@ class KernelTask(BaseModel): system_key: Optional[str] = None -async def worker_loop(queue: asyncio.Queue, tasks: dict, lock: asyncio.Lock): +async def worker_loop(queue: asyncio.Queue, tasks: dict, lock: asyncio.Lock, worker_id: int = 0): while True: task_id = await queue.get() try: @@ -66,6 +66,10 @@ async def worker_loop(queue: asyncio.Queue, tasks: dict, lock: asyncio.Lock): queue.task_done() continue payload = entry["payload"] + # mark as running and record worker id / start time + tasks[task_id]["status"] = "running" + tasks[task_id]["worker"] = worker_id + tasks[task_id]["started_at"] = datetime.datetime.utcnow().isoformat() # process (outside lock) result = await process_kernel_simulation_task(payload) async with lock: @@ -115,6 +119,13 @@ async def create_task(t: KernelTask, wait: bool = False, timeout: float = 30.0): # synchronous path: process inline with timeout try: + # mark as running for synchronous (wait) path + async with app.state.tasks_lock: + if task_id in app.state.tasks: + app.state.tasks[task_id]["status"] = "running" + app.state.tasks[task_id]["worker"] = "inline" + app.state.tasks[task_id]["started_at"] = datetime.datetime.utcnow().isoformat() + result = await asyncio.wait_for( process_kernel_simulation_task(payload), timeout=timeout ) diff --git a/backend_app/sync_simulators.py b/backend_app/sync_simulators.py index 86f23d8..85a2ecf 100644 --- a/backend_app/sync_simulators.py +++ b/backend_app/sync_simulators.py @@ -233,7 +233,7 @@ def _select_sync_simulator(kernel_name: str): return _simulate_matmul_sync elif kn == "bmm": return _simulate_bmm_sync - elif kn == "layernorm" in kn: + elif kn == "layernorm": return _simulate_layernorm_sync elif kn == "gelu": return _simulate_gelu_sync From 88fb3d4fde4ea1180f83044f4d75beecb343c1a4 Mon Sep 17 00:00:00 2001 From: Terrence <39916879+TerrenceZhangX@users.noreply.github.com> Date: Wed, 17 Sep 2025 13:24:57 +0800 Subject: [PATCH 10/15] Update backend_app/README.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- backend_app/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend_app/README.md b/backend_app/README.md index 68c9be7..44ad12e 100644 --- a/backend_app/README.md +++ b/backend_app/README.md @@ -133,7 +133,7 @@ def _simulate_conv_sync(kernel_name, input_dim, dtype_str, system_key=None): 2. Update `_select_sync_simulator` in the same file to return your function when appropriate (e.g. `if "conv" in kn:`). -3. Optionally add the op keyword to `get_supported_ops()` in `backend_app/simulator.py`. +3. Optionally add the op keyword to `get_supported_ops()` in `backend_app/sim_utils.py`. 4. Add unit and/or integration tests to cover happy-path and failure cases. From e2fb8fde246de72b7cb94d965e79b2d3e3d321dd Mon Sep 17 00:00:00 2001 From: Terrence <39916879+TerrenceZhangX@users.noreply.github.com> Date: Wed, 17 Sep 2025 13:27:11 +0800 Subject: [PATCH 11/15] Update backend_app/sync_simulators.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- backend_app/sync_simulators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend_app/sync_simulators.py b/backend_app/sync_simulators.py index 85a2ecf..c67ff62 100644 --- a/backend_app/sync_simulators.py +++ b/backend_app/sync_simulators.py @@ -87,7 +87,7 @@ def _simulate_bmm_sync( latency = op.compile_and_simulate(device, compile_mode="heuristic-GPU") return { "status": "success", - "output": {"summary": "matmul simulated"}, + "output": {"summary": "bmm simulated"}, "time_taken": float(latency), "metadata": { "kernel_name": kernel_name, From f82b085a739dd1b2ab99eb13010c76d8ba448181 Mon Sep 17 00:00:00 2001 From: Terrence <39916879+TerrenceZhangX@users.noreply.github.com> Date: Wed, 17 Sep 2025 13:27:21 +0800 Subject: [PATCH 12/15] Update tests/test_api_integration.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_api_integration.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_api_integration.py b/tests/test_api_integration.py index 7e29e65..cc62b11 100644 --- a/tests/test_api_integration.py +++ b/tests/test_api_integration.py @@ -177,8 +177,8 @@ def test_create_task_and_poll_matmul(matmul_payload): if payload.get("kernel_name") == "itest_matmul_fp8_unsupported": # this kernel is expected to be unsupported assert ( - info.get("result", {}).get("metadata").get("error_code") - ), "INVALID_INPUT" + info.get("result", {}).get("metadata").get("error_code") == "INVALID_INPUT" + ), f"error_code={info.get('result', {}).get('metadata').get('error_code')}" elif payload.get("kernel_name") == "itest_matmul_M_1": # this kernel is expected to be very fast assert ( From d178fbb09246e9cbb4d5772c1e83c193d6237797 Mon Sep 17 00:00:00 2001 From: zhangt Date: Wed, 8 Oct 2025 21:16:26 -0700 Subject: [PATCH 13/15] Improve readme --- backend_app/README.md | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/backend_app/README.md b/backend_app/README.md index 44ad12e..eae5f78 100644 --- a/backend_app/README.md +++ b/backend_app/README.md @@ -33,7 +33,13 @@ sudo docker run --rm -p 8000:8000 llmcompass-backend - `POST /tasks` — submit a simulation task (returns `task_id`) - `GET /tasks/{task_id}` — query task status and result -Example task payload (matmul): +### Usage + +1. **Submit a task**: POST the payload to the /tasks endpoint (for example `requests.post(f"{BASE}/tasks", json=payload, timeout=5)`). On success (HTTP 200) parse the response JSON and read the returned `task_id`. +2. **Poll the task status**: GET `/tasks/{task_id}` (for example `requests.get(f"{BASE}/tasks/{task_id}", timeout=5)`) until the returned `status` is `done` (or `failed`). Handle non-200 responses and timeouts as needed. +3. **Parse the result**: when the task `status` is `done`, inspect the `result` object. The `result.status` field indicates whether the simulation succeeded (e.g., `success`) or failed. The `time_taken` field is the simulation duration in seconds. Other fields in `result` (for example `metadata`) describe the simulated kernel and the target system. + +Below is an example task payload for a matmul operation. When constructing payload, ensure the input_dim follows the simulator’s expected format for this operation (e.g., a list of two two-element dimension arrays). ```json { @@ -45,7 +51,7 @@ Example task payload (matmul): } ``` -Example finished task (matmul): +Below is an example of a completed matmul task, illustrating the response fields and their structure: ```json { "status_code": 200, @@ -103,7 +109,7 @@ Example finished task (matmul): ``` -Task states may include `queued`, `running`, `done`, `failed` (scheduler/worker dependent). +Task states: `queued`, `running`, `done`, `failed` (scheduler/worker-dependent). When a task is submitted it enters the `queued` state. A free worker picks up the next queued task and the state transitions to `running`. After execution finishes, the task becomes `done` on success or `failed` on error. ## Code layout and runtime flow From 5b537f345bd1c6ebfdd61a7cd0c69398e703ec58 Mon Sep 17 00:00:00 2001 From: zhangt Date: Tue, 14 Oct 2025 21:59:12 -0700 Subject: [PATCH 14/15] Remove redundant information and improve field readability --- backend_app/README.md | 39 +++++++++++++++++----------------- backend_app/main.py | 2 +- backend_app/scheduler.py | 26 ++++++++++++++--------- backend_app/sync_simulators.py | 35 +++++------------------------- 4 files changed, 42 insertions(+), 60 deletions(-) diff --git a/backend_app/README.md b/backend_app/README.md index eae5f78..eb06326 100644 --- a/backend_app/README.md +++ b/backend_app/README.md @@ -64,26 +64,9 @@ Below is an example of a completed matmul task, illustrating the response fields "output": { "summary": "matmul simulated" }, - "time_taken": 1.4408317802844531e-05, - "metadata": { - "kernel_name": "itest_matmul_M_1", - "input_dim": [ - [ - 1, - 2048 - ], - [ - 2048, - 7168 - ] - ], - "dtype": [ - "c10::BFloat16", - "c10::BFloat16" - ] - } + "simulated_time": 1.4408317802844531e-05, }, - "payload": { + "user_submitted_request": { "kernel_name": "itest_matmul_M_1", "op": "matmul", "input_dim": [ @@ -108,6 +91,24 @@ Below is an example of a completed matmul task, illustrating the response fields } ``` +For failed tasks, the `result` includes a `failure_reason` object to aid root‑cause analysis. It typically contains fields like `kernel_name`, `error` (message), and `error_code`. Example: +```json +{ + ... + "result": { + "kernel_name": ..., + "status": "failed", + "output": null, + "simulated_time": null, + "failure_reason": { + "kernel_name": ..., + "error": "unsupported op - no generic simulator available", + "error_code": "UNSUPPORTED_OP" + } + }, + ... +} +``` Task states: `queued`, `running`, `done`, `failed` (scheduler/worker-dependent). When a task is submitted it enters the `queued` state. A free worker picks up the next queued task and the state transitions to `running`. After execution finishes, the task becomes `done` on success or `failed` on error. diff --git a/backend_app/main.py b/backend_app/main.py index 5b1fa32..617ef0d 100644 --- a/backend_app/main.py +++ b/backend_app/main.py @@ -182,7 +182,7 @@ async def get_task(task_id: str): "task_id": task_id, "status": status, "result": result, - "payload": payload, + "user_submitted_request": payload, "created_at": created_at, "updated_at": updated_at, } diff --git a/backend_app/scheduler.py b/backend_app/scheduler.py index 26d280a..9039034 100644 --- a/backend_app/scheduler.py +++ b/backend_app/scheduler.py @@ -13,7 +13,7 @@ async def simulate_kernel_trace( """ Dispatch to real software_model simulations. Blocking compile_and_simulate calls are executed in a thread via asyncio.to_thread so the event loop is not blocked. - Returns standardized result: {status, output, time_taken, metadata} + Returns standardized result: {status, output, simulated_time} """ # prefer op if provided (more specific), else fall back to kernel_name selector = op if op else kernel_name @@ -50,13 +50,19 @@ async def process_kernel_simulation_task(kernel_task: Dict[str, Any]) -> Dict[st ) end = time.time() # normalize to expected schema - out = { - "kernel_name": kernel_name, - "status": res.get("status", "failed"), - "output": res.get("output"), - "time_taken": res.get("time_taken", end - start), - "metadata": res.get( - "metadata", {"op": op, "input_dim": input_dim, "dtype": dtype} - ), - } + if res.get("status") == "failed": + out = { + "kernel_name": kernel_name, + "status": "failed", + "output": None, + "simulated_time": None, + "failure_reason": res.get("metadata", {}), + } + else: + out = { + "kernel_name": kernel_name, + "status": res.get("status"), + "output": res.get("output"), + "simulated_time": res.get("simulated_time"), + } return out diff --git a/backend_app/sync_simulators.py b/backend_app/sync_simulators.py index c67ff62..9ccc8fe 100644 --- a/backend_app/sync_simulators.py +++ b/backend_app/sync_simulators.py @@ -46,12 +46,7 @@ def _simulate_matmul_sync( return { "status": "success", "output": {"summary": "matmul simulated"}, - "time_taken": float(latency), - "metadata": { - "kernel_name": kernel_name, - "input_dim": input_dim, - "dtype": dtype_str, - }, + "simulated_time": float(latency), } @@ -88,12 +83,7 @@ def _simulate_bmm_sync( return { "status": "success", "output": {"summary": "bmm simulated"}, - "time_taken": float(latency), - "metadata": { - "kernel_name": kernel_name, - "input_dim": input_dim, - "dtype": dtype_str, - }, + "simulated_time": float(latency), } @@ -128,12 +118,7 @@ def _simulate_layernorm_sync( return { "status": "success", "output": {"summary": "LayerNorm simulated"}, - "time_taken": float(latency), - "metadata": { - "kernel_name": kernel_name, - "input_dim": input_dim, - "dtype": dtype_str, - }, + "simulated_time": float(latency), } @@ -168,12 +153,7 @@ def _simulate_gelu_sync( return { "status": "success", "output": {"summary": "GeLU simulated"}, - "time_taken": float(latency), - "metadata": { - "kernel_name": kernel_name, - "input_dim": input_dim, - "dtype": dtype_str, - }, + "simulated_time": float(latency), } @@ -208,12 +188,7 @@ def _simulate_softmax_sync( return { "status": "success", "output": {"summary": "Softmax simulated"}, - "time_taken": float(latency), - "metadata": { - "kernel_name": kernel_name, - "input_dim": input_dim, - "dtype": dtype_str, - }, + "simulated_time": float(latency), } From 65dec66aeed335174d42a96c83427767cf13b648 Mon Sep 17 00:00:00 2001 From: zhangt Date: Tue, 14 Oct 2025 22:16:56 -0700 Subject: [PATCH 15/15] Remove duplicated kernel name in response and update readme --- backend_app/README.md | 62 +++++++++++++++------------------- backend_app/scheduler.py | 4 +-- backend_app/sim_utils.py | 7 ++-- backend_app/sync_simulators.py | 42 +++++++++++------------ 4 files changed, 53 insertions(+), 62 deletions(-) diff --git a/backend_app/README.md b/backend_app/README.md index eb06326..f8fcf80 100644 --- a/backend_app/README.md +++ b/backend_app/README.md @@ -54,40 +54,36 @@ Below is an example task payload for a matmul operation. When constructing paylo Below is an example of a completed matmul task, illustrating the response fields and their structure: ```json { - "status_code": 200, - "body": { - "task_id": "089d0b13-2ef9-43e1-bde3-44ed7219e959", - "status": "done", - "result": { - "kernel_name": "itest_matmul_M_1", - "status": "success", - "output": { - "summary": "matmul simulated" - }, - "simulated_time": 1.4408317802844531e-05, + "task_id": "089d0b13-2ef9-43e1-bde3-44ed7219e959", + "status": "done", + "result": { + "status": "success", + "output": { + "summary": "matmul simulated" }, - "user_submitted_request": { - "kernel_name": "itest_matmul_M_1", - "op": "matmul", - "input_dim": [ - [ - 1, - 2048 - ], - [ - 2048, - 7168 - ] - ], - "dtype": [ - "c10::BFloat16", - "c10::BFloat16" - ], - "system_key": "A100_4_fp16" + "simulated_time": 1.4408317802844531e-05, }, - "created_at": "2025-09-17T02:23:11.777675", - "updated_at": "2025-09-17T02:23:11.778457" - } + "user_submitted_request": { + "kernel_name": "itest_matmul_M_1", + "op": "matmul", + "input_dim": [ + [ + 1, + 2048 + ], + [ + 2048, + 7168 + ] + ], + "dtype": [ + "c10::BFloat16", + "c10::BFloat16" + ], + "system_key": "A100_4_fp16" + }, + "created_at": "2025-09-17T02:23:11.777675", + "updated_at": "2025-09-17T02:23:11.778457" } ``` @@ -96,12 +92,10 @@ For failed tasks, the `result` includes a `failure_reason` object to aid root‑ { ... "result": { - "kernel_name": ..., "status": "failed", "output": null, "simulated_time": null, "failure_reason": { - "kernel_name": ..., "error": "unsupported op - no generic simulator available", "error_code": "UNSUPPORTED_OP" } diff --git a/backend_app/scheduler.py b/backend_app/scheduler.py index 9039034..4be1532 100644 --- a/backend_app/scheduler.py +++ b/backend_app/scheduler.py @@ -26,7 +26,7 @@ async def simulate_kernel_trace( # simulator returns dict; validate and propagate structured failure if not isinstance(result, dict): return _make_failure( - kernel_name, "simulator returned invalid result type", "SIMULATOR_ERROR" + "simulator returned invalid result type", "SIMULATOR_ERROR" ) # ensure failure results have error_code when possible if result.get("status") == "failed": @@ -52,7 +52,6 @@ async def process_kernel_simulation_task(kernel_task: Dict[str, Any]) -> Dict[st # normalize to expected schema if res.get("status") == "failed": out = { - "kernel_name": kernel_name, "status": "failed", "output": None, "simulated_time": None, @@ -60,7 +59,6 @@ async def process_kernel_simulation_task(kernel_task: Dict[str, Any]) -> Dict[st } else: out = { - "kernel_name": kernel_name, "status": res.get("status"), "output": res.get("output"), "simulated_time": res.get("simulated_time"), diff --git a/backend_app/sim_utils.py b/backend_app/sim_utils.py index 389537b..1f7e085 100644 --- a/backend_app/sim_utils.py +++ b/backend_app/sim_utils.py @@ -20,22 +20,21 @@ def _make_tensor(shape, dtype_obj): # centralized failure helper to avoid repeated dict literals -def _make_failure(kernel_name: str, error: str, error_code: str): +def _make_failure(error: str, error_code: str): return { "status": "failed", "output": None, "time_taken": None, "metadata": { - "kernel_name": kernel_name, "error": error, "error_code": error_code, }, } -def _make_missing_system(kernel_name: str, system_key: str): +def _make_missing_system(system_key: str): return _make_failure( - kernel_name, f"missing system configuration '{system_key}'", "NO_SYSTEM" + f"missing system configuration '{system_key}'", "NO_SYSTEM" ) diff --git a/backend_app/sync_simulators.py b/backend_app/sync_simulators.py index 9ccc8fe..e05a79f 100644 --- a/backend_app/sync_simulators.py +++ b/backend_app/sync_simulators.py @@ -25,10 +25,10 @@ def _simulate_matmul_sync( if dt is None: return _make_failure( - kernel_name, "invalid or unsupported dtype", "INVALID_INPUT" + "invalid or unsupported dtype", "INVALID_INPUT" ) elif A_shape is None or B_shape is None: - return _make_failure(kernel_name, "invalid input dimension", "INVALID_INPUT") + return _make_failure("invalid input dimension", "INVALID_INPUT") op = Matmul(dt) A = _make_tensor(A_shape, dt) @@ -36,10 +36,10 @@ def _simulate_matmul_sync( _ = op(A, B) if not system_key: - return _make_failure(kernel_name, "no valid system_key provided", "NO_SYSTEM") + return _make_failure("no valid system_key provided", "NO_SYSTEM") system = system_dict.get(system_key) if system is None: - return _make_missing_system(kernel_name, system_key) + return _make_missing_system(system_key) device = system.device latency = op.compile_and_simulate(device, compile_mode="heuristic-GPU") @@ -62,10 +62,10 @@ def _simulate_bmm_sync( if dt is None: return _make_failure( - kernel_name, "invalid or unsupported dtype", "INVALID_INPUT" + "invalid or unsupported dtype", "INVALID_INPUT" ) elif A_shape is None or B_shape is None: - return _make_failure(kernel_name, "invalid input dimension", "INVALID_INPUT") + return _make_failure("invalid input dimension", "INVALID_INPUT") op = BatchedMatmul(dt) A = _make_tensor(A_shape, dt) @@ -73,10 +73,10 @@ def _simulate_bmm_sync( _ = op(A, B) if not system_key: - return _make_failure(kernel_name, "no valid system_key provided", "NO_SYSTEM") + return _make_failure("no valid system_key provided", "NO_SYSTEM") system = system_dict.get(system_key) if system is None: - return _make_missing_system(kernel_name, system_key) + return _make_missing_system(system_key) device = system.device latency = op.compile_and_simulate(device, compile_mode="heuristic-GPU") @@ -98,20 +98,20 @@ def _simulate_layernorm_sync( if dt is None: return _make_failure( - kernel_name, "invalid or unsupported dtype", "INVALID_INPUT" + "invalid or unsupported dtype", "INVALID_INPUT" ) elif A_shape is None: - return _make_failure(kernel_name, "invalid input dimension", "INVALID_INPUT") + return _make_failure("invalid input dimension", "INVALID_INPUT") op = LayerNorm(dt) A = _make_tensor(A_shape, dt) _ = op(A) if not system_key: - return _make_failure(kernel_name, "no valid system_key provided", "NO_SYSTEM") + return _make_failure("no valid system_key provided", "NO_SYSTEM") system = system_dict.get(system_key) if system is None: - return _make_missing_system(kernel_name, system_key) + return _make_missing_system(system_key) device = system.device latency = op.compile_and_simulate(device, compile_mode="heuristic-GPU") @@ -133,20 +133,20 @@ def _simulate_gelu_sync( if dt is None: return _make_failure( - kernel_name, "invalid or unsupported dtype", "INVALID_INPUT" + "invalid or unsupported dtype", "INVALID_INPUT" ) elif A_shape is None: - return _make_failure(kernel_name, "invalid input dimension", "INVALID_INPUT") + return _make_failure("invalid input dimension", "INVALID_INPUT") op = GeLu(dt) A = _make_tensor(A_shape, dt) _ = op(A) if not system_key: - return _make_failure(kernel_name, "no valid system_key provided", "NO_SYSTEM") + return _make_failure("no valid system_key provided", "NO_SYSTEM") system = system_dict.get(system_key) if system is None: - return _make_missing_system(kernel_name, system_key) + return _make_missing_system(system_key) device = system.device latency = op.compile_and_simulate(device, compile_mode="heuristic-GPU") @@ -168,20 +168,20 @@ def _simulate_softmax_sync( if dt is None: return _make_failure( - kernel_name, "invalid or unsupported dtype", "INVALID_INPUT" + "invalid or unsupported dtype", "INVALID_INPUT" ) elif A_shape is None: - return _make_failure(kernel_name, "invalid input dimension", "INVALID_INPUT") + return _make_failure("invalid input dimension", "INVALID_INPUT") op = Softmax(dt) A = _make_tensor(A_shape, dt) _ = op(A) if not system_key: - return _make_failure(kernel_name, "no valid system_key provided", "NO_SYSTEM") + return _make_failure("no valid system_key provided", "NO_SYSTEM") system = system_dict.get(system_key) if system is None: - return _make_missing_system(kernel_name, system_key) + return _make_missing_system(system_key) device = system.device latency = op.compile_and_simulate(device, compile_mode="heuristic-GPU") @@ -196,7 +196,7 @@ def _simulate_fail( kernel_name: str, _input_dim=None, _dtype_str: str = "", system_key: str = None, ) -> Dict[str, Any]: return _make_failure( - kernel_name, "unsupported op - no generic simulator available", "UNSUPPORTED_OP" + "unsupported op - no generic simulator available", "UNSUPPORTED_OP" )