Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ Issues = "https://github.com/Attakay78/api-shield/issues"
[project.optional-dependencies]
fastapi = ["fastapi>=0.100"]
redis = ["redis[asyncio]>=5.0"]
dashboard = ["jinja2>=3.1", "aiofiles>=23.0"]
dashboard = [
"jinja2>=3.1",
"aiofiles>=23.0",
"python-multipart>=0.0.22",
]
cli = ["typer>=0.12"]
yaml = ["pyyaml>=6.0"]
toml = ["tomli-w>=1.0"]
Expand All @@ -47,6 +51,7 @@ all = [
"typer>=0.12",
"pyyaml>=6.0",
"tomli-w>=1.0",
"python-multipart>=0.0.22",
]
dev = [
"pytest>=8.0",
Expand Down
5 changes: 5 additions & 0 deletions shield/dashboard/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Shield dashboard — mountable HTMX admin UI."""

from shield.dashboard.app import ShieldDashboard

__all__ = ["ShieldDashboard"]
105 changes: 105 additions & 0 deletions shield/dashboard/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""Shield dashboard — mountable Starlette admin UI factory."""

from __future__ import annotations

import importlib.metadata
from pathlib import Path
from typing import TYPE_CHECKING

from starlette.applications import Starlette
from starlette.routing import Route
from starlette.templating import Jinja2Templates

from shield.core.engine import ShieldEngine
from shield.dashboard import routes as r

if TYPE_CHECKING:
from starlette.types import ASGIApp

_TEMPLATES_DIR = Path(__file__).parent / "templates"


def ShieldDashboard(
engine: ShieldEngine,
prefix: str = "/shield",
auth: tuple[str, str] | None = None,
) -> ASGIApp:
"""Create a mountable Starlette admin UI for the Shield engine.

Mount it on any FastAPI / Starlette application::

app.mount("/shield", ShieldDashboard(engine=engine))

The dashboard is completely self-contained and does **not** affect the
parent application's routing, OpenAPI schema, or middleware stack.

Parameters
----------
engine:
The :class:`~shield.core.engine.ShieldEngine` whose state this
dashboard will display and control.
prefix:
The URL prefix at which the dashboard is mounted. Used to build
correct links inside templates. Should match the path passed to
``app.mount()``. Defaults to ``"/shield"``.
auth:
Optional ``(username, password)`` tuple. When provided, all
dashboard requests must carry a valid ``Authorization: Basic …``
header. Without credentials the dashboard is open to anyone who
can reach it.

Returns
-------
ASGIApp
A Starlette ASGI application. Pass it directly to
``app.mount()``.
"""
templates = Jinja2Templates(directory=str(_TEMPLATES_DIR))

# Register custom Jinja2 filter so templates can base64-encode path keys
# for safe embedding in URL segments.
import base64

templates.env.filters["encode_path"] = lambda p: (
base64.urlsafe_b64encode(p.encode()).decode().rstrip("=")
)
# Expose path_slug as a global so templates can call it without import.
templates.env.globals["path_slug"] = r.path_slug

try:
version = importlib.metadata.version("api-shield")
except importlib.metadata.PackageNotFoundError:
version = "0.1.0"

starlette_app = Starlette(
routes=[
Route("/", r.index),
Route("/routes", r.routes_partial),
Route("/modal/global/enable", r.modal_global_enable),
Route("/modal/global/disable", r.modal_global_disable),
Route("/modal/{action}/{path_key}", r.action_modal),
Route("/global-maintenance/enable", r.global_maintenance_enable, methods=["POST"]),
Route("/global-maintenance/disable", r.global_maintenance_disable, methods=["POST"]),
Route("/toggle/{path_key}", r.toggle, methods=["POST"]),
Route("/disable/{path_key}", r.disable, methods=["POST"]),
Route("/enable/{path_key}", r.enable, methods=["POST"]),
Route("/schedule", r.schedule, methods=["POST"]),
Route("/schedule/{path_key}", r.cancel_schedule, methods=["DELETE"]),
Route("/audit", r.audit_page),
Route("/audit/rows", r.audit_rows),
Route("/events", r.events),
],
)

# Inject shared state so route handlers can access engine, templates, etc.
starlette_app.state.engine = engine
starlette_app.state.templates = templates
starlette_app.state.prefix = prefix.rstrip("/")
starlette_app.state.version = version

if auth is not None:
from shield.dashboard.auth import BasicAuthMiddleware

return BasicAuthMiddleware(starlette_app, username=auth[0], password=auth[1])

return starlette_app
61 changes: 61 additions & 0 deletions shield/dashboard/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Optional HTTP Basic Auth middleware for the Shield dashboard."""

from __future__ import annotations

import base64
import binascii

from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import Response
from starlette.types import ASGIApp


class BasicAuthMiddleware(BaseHTTPMiddleware):
"""Wrap the dashboard Starlette app with HTTP Basic Authentication.

When *auth* credentials are provided to :func:`ShieldDashboard`, this
middleware is added to the inner app. All requests must carry a valid
``Authorization: Basic …`` header, otherwise a ``401`` challenge is
returned.

Parameters
----------
app:
The ASGI application to protect.
username:
Expected username.
password:
Expected password.
"""

def __init__(self, app: ASGIApp, username: str, password: str) -> None:
super().__init__(app)
self._username = username
self._password = password

async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
"""Validate Basic Auth credentials before passing the request through."""
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Basic "):
return self._challenge()

try:
decoded = base64.b64decode(auth_header[6:]).decode("utf-8")
username, _, password = decoded.partition(":")
except (binascii.Error, UnicodeDecodeError):
return self._challenge()

if username != self._username or password != self._password:
return self._challenge()

return await call_next(request)

@staticmethod
def _challenge() -> Response:
"""Return a 401 Unauthorized response with a WWW-Authenticate challenge."""
return Response(
status_code=401,
headers={"WWW-Authenticate": 'Basic realm="Shield Dashboard"'},
content="Unauthorized",
)
Loading
Loading