From 7f9a9de7bb9dae16abdfbd040998403b89258df9 Mon Sep 17 00:00:00 2001 From: Joe Freeman Date: Fri, 30 Jan 2026 14:00:58 +0000 Subject: [PATCH 1/2] Infer projects from domain, and remove namespaces --- clients/python/coflux/__main__.py | 113 +---- clients/python/coflux/worker.py | 5 +- server/lib/coflux/application.ex | 6 +- server/lib/coflux/auth.ex | 61 +++ server/lib/coflux/auth/token_store.ex | 18 +- server/lib/coflux/config.ex | 28 +- server/lib/coflux/handlers/api.ex | 644 +++++++++++-------------- server/lib/coflux/handlers/auth.ex | 108 ----- server/lib/coflux/handlers/topics.ex | 54 ++- server/lib/coflux/handlers/utils.ex | 58 ++- server/lib/coflux/handlers/worker.ex | 78 +-- server/lib/coflux/project_store.ex | 76 +++ server/lib/coflux/projects.ex | 184 ------- server/lib/coflux/topic_utils.ex | 11 - server/lib/coflux/topics/logs.ex | 10 +- server/lib/coflux/topics/module.ex | 12 +- server/lib/coflux/topics/modules.ex | 12 +- server/lib/coflux/topics/pool.ex | 12 +- server/lib/coflux/topics/pools.ex | 12 +- server/lib/coflux/topics/projects.ex | 23 - server/lib/coflux/topics/run.ex | 12 +- server/lib/coflux/topics/search.ex | 12 +- server/lib/coflux/topics/sessions.ex | 12 +- server/lib/coflux/topics/workflow.ex | 12 +- server/lib/coflux/topics/workspaces.ex | 12 +- 25 files changed, 656 insertions(+), 929 deletions(-) create mode 100644 server/lib/coflux/auth.ex delete mode 100644 server/lib/coflux/handlers/auth.ex create mode 100644 server/lib/coflux/project_store.ex delete mode 100644 server/lib/coflux/projects.ex delete mode 100644 server/lib/coflux/topics/projects.ex diff --git a/clients/python/coflux/__main__.py b/clients/python/coflux/__main__.py index ee80eb51..24c484c4 100644 --- a/clients/python/coflux/__main__.py +++ b/clients/python/coflux/__main__.py @@ -93,7 +93,6 @@ def _api_request( def _create_session( host: str, - project_id: str, workspace_name: str, provides: dict[str, list[str]] | None = None, concurrency: int | None = None, @@ -101,7 +100,7 @@ def _create_session( *, secure: bool, ) -> str: - payload: dict[str, t.Any] = {"projectId": project_id, "workspaceName": workspace_name} + payload: dict[str, t.Any] = {"workspaceName": workspace_name} if provides: payload["provides"] = provides if concurrency: @@ -157,7 +156,6 @@ def _load_modules( def _register_manifests( - project_id: str, workspace_name: str, host: str, targets: dict[str, dict[str, tuple[models.Target, t.Callable]]], @@ -218,7 +216,6 @@ def _register_manifests( token, secure=secure, json={ - "projectId": project_id, "workspaceName": workspace_name, "manifests": manifests, }, @@ -226,7 +223,7 @@ def _register_manifests( def _get_pool( - host: str, project_id: str, workspace_name: str, pool_name: str, token: str | None, *, secure: bool + host: str, workspace_name: str, pool_name: str, token: str | None, *, secure: bool ) -> dict | None: try: return _api_request( @@ -236,7 +233,6 @@ def _get_pool( token, secure=secure, params={ - "project": project_id, "workspace": workspace_name, "pool": pool_name, }, @@ -272,7 +268,6 @@ def _print_table( def _init( *modules: types.ModuleType | str, - project: str, workspace: str, host: str, token: str | None, @@ -288,7 +283,7 @@ def _init( try: targets = _load_modules(list(modules)) if register: - _register_manifests(project, workspace, host, targets, token=token, secure=secure) + _register_manifests(workspace, host, targets, token=token, secure=secure) # Track whether we created the session (vs it being provided externally) session_provided = session_id is not None @@ -303,13 +298,12 @@ def _init( time.sleep(delay) print("Creating session...") session_id = _create_session( - host, project, workspace, provides, concurrency, token=token, secure=secure + host, workspace, provides, concurrency, token=token, secure=secure ) print("Session created.") try: with Worker( - project, workspace, host, secure, @@ -434,8 +428,8 @@ def _get_default_secure() -> bool | None: return _load_config().server.secure -def _server_options(token: bool = True): - """Add server options: host, secure, and optionally token.""" +def _project_options(token: bool = True, workspace: bool = False): + """Add project options: host, secure, and optionally token and workspace.""" def decorator(f): decorators = [ click.option( @@ -463,26 +457,6 @@ def decorator(f): default=_load_config().server.token, ) ) - for d in reversed(decorators): - f = d(f) - return f - return decorator - - -def _project_options(workspace: bool = False): - """Add project option, and optionally workspace.""" - def decorator(f): - decorators = [ - click.option( - "-p", - "--project", - help="Project ID", - envvar="COFLUX_PROJECT", - default=_load_config().project, - show_default=True, - required=True, - ), - ] if workspace: decorators.append( click.option( @@ -502,14 +476,6 @@ def decorator(f): @cli.command("configure") -@click.option( - "-p", - "--project", - help="Project ID", - default=_load_config().project, - show_default=True, - prompt=True, -) @click.option( "workspace", "-w", @@ -529,7 +495,6 @@ def decorator(f): ) def configure( host: str | None, - project: str | None, workspace: str | None, ): """ @@ -540,7 +505,6 @@ def configure( path = _config_path() data = _read_config(path) - data["project"] = project data["workspace"] = workspace data.setdefault("server", {})["host"] = host _write_config(path, data) @@ -560,9 +524,7 @@ def workspaces(): @workspaces.command("list") @_project_options() -@_server_options() def workspaces_list( - project: str, host: str, token: str | None, secure: bool | None, @@ -571,7 +533,7 @@ def workspaces_list( Lists workspaces. """ use_secure = _should_use_secure(host, secure) - workspaces = _api_request("GET", host, "get_workspaces", token, secure=use_secure, params={"project": project}) + workspaces = _api_request("GET", host, "get_workspaces", token, secure=use_secure) if workspaces: # TODO: draw as tree _print_table( @@ -588,14 +550,12 @@ def workspaces_list( @workspaces.command("create") @_project_options() -@_server_options() @click.option( "--base", help="The base workspace to inherit from", ) @click.argument("name") def workspaces_create( - project: str, host: str, token: str | None, secure: bool | None, @@ -608,7 +568,7 @@ def workspaces_create( use_secure = _should_use_secure(host, secure) base_id = None if base: - workspaces = _api_request("GET", host, "get_workspaces", token, secure=use_secure, params={"project": project}) + workspaces = _api_request("GET", host, "get_workspaces", token, secure=use_secure) workspace_ids_by_name = {w["name"]: id for id, w in workspaces.items()} base_id = workspace_ids_by_name.get(base) if not base_id: @@ -622,7 +582,6 @@ def workspaces_create( token, secure=use_secure, json={ - "projectId": project, "name": name, "baseId": base_id, }, @@ -632,7 +591,6 @@ def workspaces_create( @workspaces.command("update") @_project_options(workspace=True) -@_server_options() @click.option( "--name", help="The new name of the workspace", @@ -647,7 +605,6 @@ def workspaces_create( help="Unset the base workspace", ) def workspaces_update( - project: str, workspace: str, host: str, token: str | None, @@ -660,7 +617,7 @@ def workspaces_update( Updates a workspace within the project. """ use_secure = _should_use_secure(host, secure) - workspaces = _api_request("GET", host, "get_workspaces", token, secure=use_secure, params={"project": project}) + workspaces = _api_request("GET", host, "get_workspaces", token, secure=use_secure) workspace_ids_by_name = {w["name"]: id for id, w in workspaces.items()} workspace_id = workspace_ids_by_name.get(workspace) if not workspace_id: @@ -672,8 +629,7 @@ def workspaces_update( if not base_id: raise click.BadOptionUsage("base", "Not recognised") - payload = { - "projectId": project, + payload: dict[str, t.Any] = { "workspaceId": workspace_id, } if name is not None: @@ -692,9 +648,7 @@ def workspaces_update( @workspaces.command("archive") @_project_options(workspace=True) -@_server_options() def workspaces_archive( - project: str, workspace: str, host: str, token: str | None, @@ -704,7 +658,7 @@ def workspaces_archive( Archives a workspace. """ use_secure = _should_use_secure(host, secure) - workspaces = _api_request("GET", host, "get_workspaces", token, secure=use_secure, params={"project": project}) + workspaces = _api_request("GET", host, "get_workspaces", token, secure=use_secure) workspace_ids_by_name = {w["name"]: id for id, w in workspaces.items()} workspace_id = workspace_ids_by_name.get(workspace) if not workspace_id: @@ -717,7 +671,6 @@ def workspaces_archive( token, secure=use_secure, json={ - "projectId": project, "workspaceId": workspace_id, }, ) @@ -734,8 +687,7 @@ def pools(): @pools.command("list") @_project_options(workspace=True) -@_server_options() -def pools_list(project: str, workspace: str, host: str, token: str | None, secure: bool | None): +def pools_list(workspace: str, host: str, token: str | None, secure: bool | None): """ Lists pools. """ @@ -746,7 +698,7 @@ def pools_list(project: str, workspace: str, host: str, token: str | None, secur "get_pools", token, secure=use_secure, - json={"projectId": project, "workspaceName": workspace}, + params={"workspace": workspace}, ) if pools: _print_table( @@ -765,7 +717,6 @@ def pools_list(project: str, workspace: str, host: str, token: str | None, secur @pools.command("update") @_project_options(workspace=True) -@_server_options() @click.option( "modules", "-m", @@ -788,7 +739,6 @@ def pools_list(project: str, workspace: str, host: str, token: str | None, secur ) @click.argument("name") def pools_update( - project: str, workspace: str, host: str, token: str | None, @@ -803,7 +753,7 @@ def pools_update( Updates a pool. """ use_secure = _should_use_secure(host, secure) - pool = _get_pool(host, project, workspace, name, token, secure=use_secure) or {} + pool = _get_pool(host, workspace, name, token, secure=use_secure) or {} # TODO: support explicitly unsetting 'provides' (and modules, etc?) @@ -826,7 +776,6 @@ def pools_update( token, secure=use_secure, json={ - "projectId": project, "workspaceName": workspace, "poolName": name, "pool": pool, @@ -836,9 +785,8 @@ def pools_update( @pools.command("delete") @_project_options(workspace=True) -@_server_options() @click.argument("name") -def pools_delete(project: str, workspace: str, host: str, token: str | None, secure: bool | None, name: str): +def pools_delete(workspace: str, host: str, token: str | None, secure: bool | None, name: str): """ Deletes a pool. """ @@ -849,7 +797,7 @@ def pools_delete(project: str, workspace: str, host: str, token: str | None, sec "update_pool", token, secure=use_secure, - json={"projectId": project, "workspaceName": workspace, "poolName": name, "pool": None}, + json={"workspaceName": workspace, "poolName": name, "pool": None}, ) @@ -862,7 +810,7 @@ def blobs(): @blobs.command("get") -@_server_options(token=False) +@_project_options(token=False) @click.argument("key") def blobs_get(host: str, secure: bool | None, key: str): """ @@ -889,7 +837,7 @@ def assets(): pass -def _get_asset(host: str, project_id: str, asset_id: str, token: str | None, *, secure: bool) -> dict | None: +def _get_asset(host: str, asset_id: str, token: str | None, *, secure: bool) -> dict | None: try: return _api_request( "GET", @@ -898,7 +846,6 @@ def _get_asset(host: str, project_id: str, asset_id: str, token: str | None, *, token, secure=secure, params={ - "project": project_id, "asset": asset_id, }, ) @@ -922,20 +869,19 @@ def _human_size(bytes: int) -> str: @assets.command("inspect") @_project_options() -@_server_options() @click.option( "--match", help="Glob-style matcher to filter files", ) @click.argument("id") -def assets_inspect(project: str, host: str, token: str | None, secure: bool | None, match: str | None, id: str): +def assets_inspect(host: str, token: str | None, secure: bool | None, match: str | None, id: str): """ Inspect an asset. """ use_secure = _should_use_secure(host, secure) - asset = _get_asset(host, project, id, token, secure=use_secure) + asset = _get_asset(host, id, token, secure=use_secure) if not asset: - raise click.ClickException(f"Asset '{id}' not found in project") + raise click.ClickException(f"Asset '{id}' not found") click.echo(f"Name: {asset['name'] or '(untitled)'}") @@ -962,7 +908,6 @@ def assets_inspect(project: str, host: str, token: str | None, secure: bool | No @assets.command("download") @_project_options() -@_server_options() @click.option( "--to", type=click.Path(file_okay=False, path_type=Path, resolve_path=True), @@ -980,7 +925,6 @@ def assets_inspect(project: str, host: str, token: str | None, secure: bool | No ) @click.argument("id") def assets_download( - project: str, host: str, token: str | None, secure: bool | None, @@ -993,9 +937,9 @@ def assets_download( Downloads the contents of an asset. """ use_secure = _should_use_secure(host, secure) - asset = _get_asset(host, project, id, token, secure=use_secure) + asset = _get_asset(host, id, token, secure=use_secure) if not asset: - raise click.ClickException(f"Asset '{id}' not found in project") + raise click.ClickException(f"Asset '{id}' not found") entries = asset["entries"] if match: @@ -1033,10 +977,8 @@ def assets_download( @cli.command("register") @_project_options(workspace=True) -@_server_options() @click.argument("module_name", nargs=-1) def register( - project: str, workspace: str, host: str, token: str | None, @@ -1054,13 +996,12 @@ def register( raise click.ClickException("No module(s) specified.") use_secure = _should_use_secure(host, secure) targets = _load_modules(list(module_name)) - _register_manifests(project, workspace, host, targets, token=token, secure=use_secure) + _register_manifests(workspace, host, targets, token=token, secure=use_secure) click.secho("Manifest(s) registered.", fg="green") @cli.command("worker") @_project_options(workspace=True) -@_server_options() @click.option( "--provides", help="Features that this worker provides (to be matched with features that tasks require)", @@ -1101,7 +1042,6 @@ def register( ) @click.argument("module_name", nargs=-1) def worker( - project: str, workspace: str, host: str, token: str | None, @@ -1128,7 +1068,6 @@ def worker( use_secure = _should_use_secure(host, secure) args = (*module_name,) kwargs = { - "project": project, "workspace": workspace, "host": host, "token": token, @@ -1157,12 +1096,10 @@ def worker( @cli.command("submit") @_project_options(workspace=True) -@_server_options() @click.argument("module") @click.argument("target") @click.argument("argument", nargs=-1) def submit( - project: str, workspace: str, host: str, token: str | None, @@ -1183,7 +1120,6 @@ def submit( token, secure=use_secure, params={ - "project": project, "workspace": workspace, "module": module, "target": target, @@ -1200,7 +1136,6 @@ def submit( token, secure=use_secure, json={ - "projectId": project, "workspaceName": workspace, "module": module, "target": target, diff --git a/clients/python/coflux/worker.py b/clients/python/coflux/worker.py index 226f1a4b..38c9f834 100644 --- a/clients/python/coflux/worker.py +++ b/clients/python/coflux/worker.py @@ -44,7 +44,6 @@ class SessionExpiredError(Exception): class Worker: def __init__( self, - project_id: str, workspace_name: str, server_host: str, secure: bool, @@ -54,7 +53,6 @@ def __init__( session_id: str, targets: dict[str, dict[str, tuple[models.Target, t.Callable]]], ): - self._project_id = project_id self._workspace_name = workspace_name self._server_host = server_host self._secure = secure @@ -96,7 +94,6 @@ def _url(self, scheme: str, path: str, params: dict[str, str]) -> str: def _params(self): params = { - "project": self._project_id, "workspace": self._workspace_name, } if API_VERSION: @@ -112,7 +109,7 @@ async def run(self) -> None: """Run the worker. Raises SessionExpiredError if session expires.""" check_server(self._server_host, self._secure) while True: - print(f"Connecting ({self._server_host}, {self._project_id}, {self._workspace_name})...") + print(f"Connecting ({self._server_host}, {self._workspace_name})...") scheme = "wss" if self._secure else "ws" url = self._url(scheme, "worker", self._params()) try: diff --git a/server/lib/coflux/application.ex b/server/lib/coflux/application.ex index 430147e0..0527d0c3 100644 --- a/server/lib/coflux/application.ex +++ b/server/lib/coflux/application.ex @@ -1,7 +1,7 @@ defmodule Coflux.Application do use Application - alias Coflux.{Config, Projects, Orchestration, Topics} + alias Coflux.{Config, ProjectStore, Orchestration, Topics} alias Coflux.Auth.TokenStore @impl true @@ -13,7 +13,8 @@ defmodule Coflux.Application do children = [ TokenStore, - {Projects, name: Coflux.ProjectsServer}, + # ProjectStore only needed for subdomain routing (COFLUX_BASE_DOMAIN set) + if(Config.base_domain(), do: ProjectStore), # TODO: separate launch supervisor per project? (and specify max_children?) {Task.Supervisor, name: Coflux.LauncherSupervisor}, Orchestration.Supervisor, @@ -33,7 +34,6 @@ defmodule Coflux.Application do defp topics() do [ Topics.Sessions, - Topics.Projects, Topics.Workspaces, Topics.Modules, Topics.Run, diff --git a/server/lib/coflux/auth.ex b/server/lib/coflux/auth.ex new file mode 100644 index 00000000..6eb56d20 --- /dev/null +++ b/server/lib/coflux/auth.ex @@ -0,0 +1,61 @@ +defmodule Coflux.Auth do + @moduledoc """ + Handles token authentication. + + Tokens are stored in $COFLUX_DATA_DIR/tokens.json with format: + ```json + { + "": { + "projects": ["acme", "demo"] + } + } + ``` + + - Key: SHA-256 hash of token (hex, lowercase) + - projects: array of allowed project names (empty array = all projects) + + Auth mode is controlled by COFLUX_AUTH_MODE: + - "none" (default): No authentication required + - "token": Require valid token with project access + """ + + alias Coflux.Config + alias Coflux.Auth.TokenStore + + @doc """ + Checks if the given token is authorized for the project. + + Returns `:ok` when auth is disabled or token is valid. + Returns `{:error, :unauthorized}` otherwise. + """ + def check(token, project_id) do + case Config.auth_mode() do + :none -> :ok + :token -> validate_token(token, project_id) + end + end + + defp validate_token(nil, _project_id), do: {:error, :unauthorized} + + defp validate_token(token, project_id) do + token_hash = hash_token(token) + + case TokenStore.lookup(token_hash) do + {:ok, token_config} -> + # Empty projects list means access to all projects + if token_config.projects == [] or project_id in token_config.projects do + :ok + else + {:error, :unauthorized} + end + + :error -> + {:error, :unauthorized} + end + end + + defp hash_token(token) do + :crypto.hash(:sha256, token) + |> Base.encode16(case: :lower) + end +end diff --git a/server/lib/coflux/auth/token_store.ex b/server/lib/coflux/auth/token_store.ex index 87411625..c2c3b7e8 100644 --- a/server/lib/coflux/auth/token_store.ex +++ b/server/lib/coflux/auth/token_store.ex @@ -4,12 +4,24 @@ defmodule Coflux.Auth.TokenStore do Tokens are loaded from $COFLUX_DATA_DIR/tokens.json at startup. Reads go directly to ETS for performance. + + Token file format: + ```json + { + "": { + "projects": ["acme", "demo"] + } + } + ``` + + - Key: SHA-256 hash of token (hex, lowercase) + - projects: array of allowed project names (empty array = all projects) """ use GenServer defmodule TokenConfig do - defstruct namespaces: [nil] + defstruct projects: [] end @table :coflux_auth_tokens @@ -61,7 +73,7 @@ defmodule Coflux.Auth.TokenStore do end defp parse_config(config) do - namespaces = Map.get(config, "namespaces", [nil]) - %TokenConfig{namespaces: namespaces} + projects = Map.get(config, "projects", []) + %TokenConfig{projects: projects} end end diff --git a/server/lib/coflux/config.ex b/server/lib/coflux/config.ex index 75584f3f..d3991068 100644 --- a/server/lib/coflux/config.ex +++ b/server/lib/coflux/config.ex @@ -3,6 +3,20 @@ defmodule Coflux.Config do Caches configuration values read from environment variables at startup. Uses persistent_term for fast reads without copying. + + ## Project Configuration + + At least one of the following must be configured: + + - **COFLUX_PROJECT**: Restricts the server to a single project. All requests + are routed to this project. Supports any access method including IP addresses. + + - **COFLUX_BASE_DOMAIN**: Enables subdomain-based project routing. The project + is extracted from the subdomain (e.g., `acme.example.com` → project "acme"). + Requires subdomain access - direct IP or base domain access is not allowed. + + When both are set, subdomain routing is used but only the configured project + is allowed. """ @doc """ @@ -11,6 +25,7 @@ defmodule Coflux.Config do def init do :persistent_term.put(:coflux_data_dir, parse_data_dir()) :persistent_term.put(:coflux_auth_mode, parse_auth_mode()) + :persistent_term.put(:coflux_project, System.get_env("COFLUX_PROJECT")) :persistent_term.put(:coflux_base_domain, System.get_env("COFLUX_BASE_DOMAIN")) :persistent_term.put(:coflux_allowed_origins, parse_allowed_origins()) :ok @@ -31,7 +46,18 @@ defmodule Coflux.Config do end @doc """ - Returns the base domain for namespace resolution, or nil if not set. + Returns the configured project name, or nil if not set. + + When set, restricts the server to only serve this project. + """ + def project do + :persistent_term.get(:coflux_project) + end + + @doc """ + Returns the base domain for subdomain-based routing, or nil if not set. + + When set, the project is extracted from the request's subdomain. """ def base_domain do :persistent_term.get(:coflux_base_domain) diff --git a/server/lib/coflux/handlers/api.ex b/server/lib/coflux/handlers/api.ex index d747b26c..2d2ba900 100644 --- a/server/lib/coflux/handlers/api.ex +++ b/server/lib/coflux/handlers/api.ex @@ -1,20 +1,10 @@ defmodule Coflux.Handlers.Api do import Coflux.Handlers.Utils - alias Coflux.{Orchestration, Projects, MapUtils, Version} - alias Coflux.Handlers.Auth + alias Coflux.{Auth, Orchestration, Config, ProjectStore, MapUtils, Version} - @projects_server Coflux.ProjectsServer @max_parameters 20 - # Helper to check project access and return appropriate error response - defp with_project_access(req, project_id, namespace, fun) do - case Projects.get_project_by_id(@projects_server, project_id, namespace) do - {:ok, _project} -> fun.() - :error -> json_error_response(req, "not_found", status: 404) - end - end - def init(req, opts) do req = set_cors_headers(req) @@ -32,15 +22,32 @@ defmodule Coflux.Handlers.Api do {:ok, req, opts} method -> - with {:ok, namespace} <- resolve_namespace(req), - :ok <- Auth.check(req, namespace) do - req = handle(req, method, :cowboy_req.path_info(req), namespace) + with {:ok, project_id} <- resolve_project(req), + :ok <- validate_project(project_id), + :ok <- Auth.check(get_token(req), project_id) do + req = handle(req, method, :cowboy_req.path_info(req), project_id) {:ok, req, opts} else + {:error, :not_configured} -> + req = json_error_response(req, "not_configured", status: 500) + {:ok, req, opts} + {:error, :invalid_host} -> req = json_error_response(req, "invalid_host", status: 400) {:ok, req, opts} + {:error, :project_required} -> + req = json_error_response(req, "project_required", status: 400) + {:ok, req, opts} + + {:error, :project_mismatch} -> + req = json_error_response(req, "project_mismatch", status: 403) + {:ok, req, opts} + + {:error, :project_not_found} -> + req = json_error_response(req, "project_not_found", status: 404) + {:ok, req, opts} + {:error, :unauthorized} -> req = json_error_response(req, "unauthorized", status: 401) {:ok, req, opts} @@ -58,59 +65,48 @@ defmodule Coflux.Handlers.Api do end end - defp handle(req, "POST", ["create_project"], namespace) do - {:ok, arguments, errors, req} = - read_arguments(req, %{project_name: "projectName"}) - - if Enum.empty?(errors) do - case Projects.create_project(@projects_server, arguments.project_name, namespace) do - {:ok, project_id} -> - json_response(req, %{"projectId" => project_id}) + # Validate that the resolved project is allowed + # When using subdomain routing (COFLUX_BASE_DOMAIN set), check the whitelist + # Otherwise, all projects are allowed + defp validate_project(nil), do: :ok - {:error, errors} -> - errors = - MapUtils.translate_keys(errors, %{ - project_name: "projectName" - }) - - json_error_response(req, "bad_request", details: errors) + defp validate_project(project_id) do + if Config.base_domain() do + if ProjectStore.exists?(project_id) do + :ok + else + {:error, :project_not_found} end else - json_error_response(req, "bad_request", details: errors) + :ok end end - defp handle(req, "GET", ["get_workspaces"], namespace) do - qs = :cowboy_req.parse_qs(req) - project_id = get_query_param(qs, "project") - - with_project_access(req, project_id, namespace, fn -> - case Orchestration.get_workspaces(project_id) do - {:ok, workspaces} -> - json_response( - req, - Map.new(workspaces, fn {workspace_id, workspace} -> - base_id = - if workspace.base_id, - do: Integer.to_string(workspace.base_id) - - {workspace_id, - %{ - "name" => workspace.name, - "baseId" => base_id - }} - end) - ) - end - end) + defp handle(req, "GET", ["get_workspaces"], project_id) do + case Orchestration.get_workspaces(project_id) do + {:ok, workspaces} -> + json_response( + req, + Map.new(workspaces, fn {workspace_id, workspace} -> + base_id = + if workspace.base_id, + do: Integer.to_string(workspace.base_id) + + {workspace_id, + %{ + "name" => workspace.name, + "baseId" => base_id + }} + end) + ) + end end - defp handle(req, "POST", ["create_workspace"], namespace) do + defp handle(req, "POST", ["create_workspace"], project_id) do {:ok, arguments, errors, req} = read_arguments( req, %{ - project_id: "projectId", name: "name" }, %{ @@ -119,36 +115,33 @@ defmodule Coflux.Handlers.Api do ) if Enum.empty?(errors) do - with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.create_workspace( - arguments.project_id, - arguments.name, - arguments[:base_id] - ) do - {:ok, workspace_id} -> - json_response(req, %{id: workspace_id}) + case Orchestration.create_workspace( + project_id, + arguments.name, + arguments[:base_id] + ) do + {:ok, workspace_id} -> + json_response(req, %{id: workspace_id}) - {:error, errors} -> - errors = - MapUtils.translate_keys(errors, %{ - name: "name", - base_id: "baseId" - }) + {:error, errors} -> + errors = + MapUtils.translate_keys(errors, %{ + name: "name", + base_id: "baseId" + }) - json_error_response(req, "bad_request", details: errors) - end - end) + json_error_response(req, "bad_request", details: errors) + end else json_error_response(req, "bad_request", details: errors) end end - defp handle(req, "POST", ["update_workspace"], namespace) do + defp handle(req, "POST", ["update_workspace"], project_id) do {:ok, arguments, errors, req} = read_arguments( req, %{ - project_id: "projectId", workspace_id: {"workspaceId", &parse_numeric_id/1} }, %{ @@ -158,173 +151,150 @@ defmodule Coflux.Handlers.Api do ) if Enum.empty?(errors) do - with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.update_workspace( - arguments.project_id, - arguments.workspace_id, - Map.take(arguments, [:name, :base_id]) - ) do - :ok -> - :cowboy_req.reply(204, req) + case Orchestration.update_workspace( + project_id, + arguments.workspace_id, + Map.take(arguments, [:name, :base_id]) + ) do + :ok -> + :cowboy_req.reply(204, req) - {:error, :not_found} -> - json_error_response(req, "not_found", status: 404) + {:error, :not_found} -> + json_error_response(req, "not_found", status: 404) - {:error, errors} -> - errors = - MapUtils.translate_keys(errors, %{ - name: "name", - base_id: "baseId" - }) + {:error, errors} -> + errors = + MapUtils.translate_keys(errors, %{ + name: "name", + base_id: "baseId" + }) - json_error_response(req, "bad_request", details: errors) - end - end) + json_error_response(req, "bad_request", details: errors) + end else json_error_response(req, "bad_request", details: errors) end end - defp handle(req, "POST", ["pause_workspace"], namespace) do + defp handle(req, "POST", ["pause_workspace"], project_id) do {:ok, arguments, errors, req} = read_arguments(req, %{ - project_id: "projectId", workspace_id: {"workspaceId", &parse_numeric_id/1} }) if Enum.empty?(errors) do - with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.pause_workspace( - arguments.project_id, - arguments.workspace_id - ) do - :ok -> - :cowboy_req.reply(204, req) + case Orchestration.pause_workspace( + project_id, + arguments.workspace_id + ) do + :ok -> + :cowboy_req.reply(204, req) - {:error, :not_found} -> - json_error_response(req, "not_found", status: 404) - end - end) + {:error, :not_found} -> + json_error_response(req, "not_found", status: 404) + end else json_error_response(req, "bad_request", details: errors) end end - defp handle(req, "POST", ["resume_workspace"], namespace) do + defp handle(req, "POST", ["resume_workspace"], project_id) do {:ok, arguments, errors, req} = read_arguments(req, %{ - project_id: "projectId", workspace_id: {"workspaceId", &parse_numeric_id/1} }) if Enum.empty?(errors) do - with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.resume_workspace( - arguments.project_id, - arguments.workspace_id - ) do - :ok -> - :cowboy_req.reply(204, req) + case Orchestration.resume_workspace( + project_id, + arguments.workspace_id + ) do + :ok -> + :cowboy_req.reply(204, req) - {:error, :not_found} -> - json_error_response(req, "not_found", status: 404) - end - end) + {:error, :not_found} -> + json_error_response(req, "not_found", status: 404) + end else json_error_response(req, "bad_request", details: errors) end end - defp handle(req, "POST", ["archive_workspace"], namespace) do + defp handle(req, "POST", ["archive_workspace"], project_id) do {:ok, arguments, errors, req} = read_arguments(req, %{ - project_id: "projectId", workspace_id: {"workspaceId", &parse_numeric_id/1} }) if Enum.empty?(errors) do - with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.archive_workspace( - arguments.project_id, - arguments.workspace_id - ) do - :ok -> - :cowboy_req.reply(204, req) + case Orchestration.archive_workspace( + project_id, + arguments.workspace_id + ) do + :ok -> + :cowboy_req.reply(204, req) - {:error, :descendants} -> - json_error_response(req, "bad_request", details: %{"workspaceId" => "has_dependencies"}) + {:error, :descendants} -> + json_error_response(req, "bad_request", details: %{"workspaceId" => "has_dependencies"}) - {:error, :not_found} -> - json_error_response(req, "not_found", status: 404) - end - end) + {:error, :not_found} -> + json_error_response(req, "not_found", status: 404) + end else json_error_response(req, "bad_request", details: errors) end end - defp handle(req, "GET", ["get_pools"], namespace) do - {:ok, arguments, errors, req} = - read_arguments(req, %{ - project_id: "projectId", - workspace_name: "workspaceName" - }) + defp handle(req, "GET", ["get_pools"], project_id) do + qs = :cowboy_req.parse_qs(req) + workspace_name = get_query_param(qs, "workspace") - if Enum.empty?(errors) do - with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.get_pools(arguments.project_id, arguments.workspace_name) do - {:ok, pools} -> - json_response( - req, - Map.new(pools, fn {pool_name, pool} -> - { - pool_name, - %{ - "provides" => pool.provides, - "modules" => pool.modules, - "launcherType" => if(pool.launcher, do: pool.launcher.type) - } - } - end) - ) + case Orchestration.get_pools(project_id, workspace_name) do + {:ok, pools} -> + json_response( + req, + Map.new(pools, fn {pool_name, pool} -> + { + pool_name, + %{ + "provides" => pool.provides, + "modules" => pool.modules, + "launcherType" => if(pool.launcher, do: pool.launcher.type) + } + } + end) + ) - {:error, :workspace_invalid} -> - json_error_response(req, "workspace_not_found", status: 404) - end - end) - else - json_error_response(req, "bad_request", details: errors) + {:error, :workspace_invalid} -> + json_error_response(req, "workspace_not_found", status: 404) end end - defp handle(req, "GET", ["get_pool"], namespace) do + defp handle(req, "GET", ["get_pool"], project_id) do qs = :cowboy_req.parse_qs(req) - project_id = get_query_param(qs, "project") workspace_name = get_query_param(qs, "workspace") pool_name = get_query_param(qs, "pool") - with_project_access(req, project_id, namespace, fn -> - case Orchestration.get_pools(project_id, workspace_name) do - {:ok, pools} -> - case Map.fetch(pools, pool_name) do - {:ok, pool} -> - json_response( - req, - %{ - "provides" => pool.provides, - "modules" => pool.modules, - "launcher" => format_launcher(pool.launcher) - } - ) - - :error -> - json_error_response(req, "not_found", status: 404) - end + case Orchestration.get_pools(project_id, workspace_name) do + {:ok, pools} -> + case Map.fetch(pools, pool_name) do + {:ok, pool} -> + json_response( + req, + %{ + "provides" => pool.provides, + "modules" => pool.modules, + "launcher" => format_launcher(pool.launcher) + } + ) - {:error, :workspace_invalid} -> - json_error_response(req, "workspace_not_found", status: 404) - end - end) + :error -> + json_error_response(req, "not_found", status: 404) + end + + {:error, :workspace_invalid} -> + json_error_response(req, "workspace_not_found", status: 404) + end end defp format_launcher(nil), do: nil @@ -339,161 +309,142 @@ defmodule Coflux.Handlers.Api do defp maybe_put(map, _key, nil), do: map defp maybe_put(map, key, value), do: Map.put(map, key, value) - defp handle(req, "POST", ["update_pool"], namespace) do + defp handle(req, "POST", ["update_pool"], project_id) do {:ok, arguments, errors, req} = read_arguments(req, %{ - project_id: "projectId", workspace_name: "workspaceName", pool_name: {"poolName", &parse_pool_name/1}, pool: {"pool", &parse_pool/1} }) if Enum.empty?(errors) do - with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.update_pool( - arguments.project_id, - arguments.workspace_name, - arguments.pool_name, - arguments.pool - ) do - :ok -> - :cowboy_req.reply(204, req) + case Orchestration.update_pool( + project_id, + arguments.workspace_name, + arguments.pool_name, + arguments.pool + ) do + :ok -> + :cowboy_req.reply(204, req) - {:error, :not_found} -> - json_error_response(req, "not_found", status: 404) - end - end) + {:error, :not_found} -> + json_error_response(req, "not_found", status: 404) + end else json_error_response(req, "bad_request", details: errors) end end - defp handle(req, "POST", ["stop_worker"], namespace) do + defp handle(req, "POST", ["stop_worker"], project_id) do {:ok, arguments, errors, req} = read_arguments(req, %{ - project_id: "projectId", workspace_name: "workspaceName", worker_id: {"workerId", &parse_numeric_id/1} }) if Enum.empty?(errors) do - with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.stop_worker( - arguments.project_id, - arguments.workspace_name, - arguments.worker_id - ) do - :ok -> - :cowboy_req.reply(204, req) + case Orchestration.stop_worker( + project_id, + arguments.workspace_name, + arguments.worker_id + ) do + :ok -> + :cowboy_req.reply(204, req) - {:error, :not_found} -> - json_error_response(req, "not_found", status: 404) - end - end) + {:error, :not_found} -> + json_error_response(req, "not_found", status: 404) + end else json_error_response(req, "bad_request", details: errors) end end - defp handle(req, "POST", ["resume_worker"], namespace) do + defp handle(req, "POST", ["resume_worker"], project_id) do {:ok, arguments, errors, req} = read_arguments(req, %{ - project_id: "projectId", workspace_name: "workspaceName", worker_id: {"workerId", &parse_numeric_id/1} }) if Enum.empty?(errors) do - with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.resume_worker( - arguments.project_id, - arguments.workspace_name, - arguments.worker_id - ) do - :ok -> - :cowboy_req.reply(204, req) + case Orchestration.resume_worker( + project_id, + arguments.workspace_name, + arguments.worker_id + ) do + :ok -> + :cowboy_req.reply(204, req) - {:error, :not_found} -> - json_error_response(req, "not_found", status: 404) - end - end) + {:error, :not_found} -> + json_error_response(req, "not_found", status: 404) + end else json_error_response(req, "bad_request", details: errors) end end - defp handle(req, "POST", ["register_manifests"], namespace) do + defp handle(req, "POST", ["register_manifests"], project_id) do {:ok, arguments, errors, req} = read_arguments(req, %{ - project_id: "projectId", workspace_name: "workspaceName", manifests: {"manifests", &parse_manifests/1} }) if Enum.empty?(errors) do - with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.register_manifests( - arguments.project_id, - arguments.workspace_name, - arguments.manifests - ) do - :ok -> - :cowboy_req.reply(204, req) - end - end) + case Orchestration.register_manifests( + project_id, + arguments.workspace_name, + arguments.manifests + ) do + :ok -> + :cowboy_req.reply(204, req) + end else json_error_response(req, "bad_request", details: errors) end end - defp handle(req, "POST", ["archive_module"], namespace) do + defp handle(req, "POST", ["archive_module"], project_id) do {:ok, arguments, errors, req} = read_arguments(req, %{ - project_id: "projectId", workspace_name: "workspaceName", module_name: "moduleName" }) if Enum.empty?(errors) do - with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.archive_module( - arguments.project_id, - arguments.workspace_name, - arguments.module_name - ) do - :ok -> - :cowboy_req.reply(204, req) - end - end) + case Orchestration.archive_module( + project_id, + arguments.workspace_name, + arguments.module_name + ) do + :ok -> + :cowboy_req.reply(204, req) + end else json_error_response(req, "bad_request", details: errors) end end - defp handle(req, "GET", ["get_workflow"], namespace) do + defp handle(req, "GET", ["get_workflow"], project_id) do qs = :cowboy_req.parse_qs(req) - project_id = get_query_param(qs, "project") workspace_name = get_query_param(qs, "workspace") module = get_query_param(qs, "module") target_name = get_query_param(qs, "target") - with_project_access(req, project_id, namespace, fn -> - case Orchestration.get_workflow(project_id, workspace_name, module, target_name) do - {:ok, nil} -> - json_error_response(req, "not_found", status: 404) + case Orchestration.get_workflow(project_id, workspace_name, module, target_name) do + {:ok, nil} -> + json_error_response(req, "not_found", status: 404) - {:ok, workflow} -> - json_response(req, compose_workflow(workflow)) - end - end) + {:ok, workflow} -> + json_response(req, compose_workflow(workflow)) + end end - defp handle(req, "POST", ["submit_workflow"], namespace) do + defp handle(req, "POST", ["submit_workflow"], project_id) do {:ok, arguments, errors, req} = read_arguments( req, %{ - project_id: "projectId", module: "module", target: "target", workspace_name: "workspaceName", @@ -511,39 +462,36 @@ defmodule Coflux.Handlers.Api do ) if Enum.empty?(errors) do - with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.start_run( - arguments.project_id, - arguments.module, - arguments.target, - :workflow, - arguments.arguments, - workspace: arguments.workspace_name, - wait_for: arguments[:wait_for], - cache: arguments[:cache], - defer: arguments[:defer], - delay: arguments[:delay] || 0, - retries: arguments[:retries], - recurrent: arguments[:recurrent] == true, - requires: arguments[:requires] - ) do - {:ok, run_id, step_id, execution_id} -> - json_response(req, %{ - "runId" => run_id, - "stepId" => step_id, - "executionId" => execution_id - }) - end - end) + case Orchestration.start_run( + project_id, + arguments.module, + arguments.target, + :workflow, + arguments.arguments, + workspace: arguments.workspace_name, + wait_for: arguments[:wait_for], + cache: arguments[:cache], + defer: arguments[:defer], + delay: arguments[:delay] || 0, + retries: arguments[:retries], + recurrent: arguments[:recurrent] == true, + requires: arguments[:requires] + ) do + {:ok, run_id, step_id, execution_id} -> + json_response(req, %{ + "runId" => run_id, + "stepId" => step_id, + "executionId" => execution_id + }) + end else json_error_response(req, "bad_request", details: errors) end end - defp handle(req, "POST", ["cancel_execution"], namespace) do + defp handle(req, "POST", ["cancel_execution"], project_id) do {:ok, arguments, errors, req} = read_arguments(req, %{ - project_id: "projectId", execution_id: "executionId" }) @@ -551,90 +499,78 @@ defmodule Coflux.Handlers.Api do execution_id = String.to_integer(arguments.execution_id) if Enum.empty?(errors) do - with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.cancel_execution( - arguments.project_id, - execution_id - ) do - :ok -> - json_response(req, %{}) - end - end) + case Orchestration.cancel_execution( + project_id, + execution_id + ) do + :ok -> + json_response(req, %{}) + end else json_error_response(req, "bad_request", details: errors) end end - defp handle(req, "POST", ["rerun_step"], namespace) do + defp handle(req, "POST", ["rerun_step"], project_id) do {:ok, arguments, errors, req} = read_arguments(req, %{ - project_id: "projectId", workspace_name: "workspaceName", step_id: "stepId" }) if Enum.empty?(errors) do - with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.rerun_step( - arguments.project_id, - arguments.step_id, - arguments.workspace_name - ) do - {:ok, execution_id, attempt} -> - json_response(req, %{"executionId" => execution_id, "attempt" => attempt}) + case Orchestration.rerun_step( + project_id, + arguments.step_id, + arguments.workspace_name + ) do + {:ok, execution_id, attempt} -> + json_response(req, %{"executionId" => execution_id, "attempt" => attempt}) - {:error, :workspace_invalid} -> - json_error_response(req, "bad_request", details: %{"workspace" => "invalid"}) - end - end) + {:error, :workspace_invalid} -> + json_error_response(req, "bad_request", details: %{"workspace" => "invalid"}) + end else json_error_response(req, "bad_request", details: errors) end end - defp handle(req, "GET", ["search"], namespace) do + defp handle(req, "GET", ["search"], project_id) do qs = :cowboy_req.parse_qs(req) - project_id = get_query_param(qs, "project") # TODO: handle parse error {:ok, workspace_id} = parse_numeric_id(get_query_param(qs, "workspaceId")) query = get_query_param(qs, "query") - with_project_access(req, project_id, namespace, fn -> - case Topical.execute( - Coflux.TopicalRegistry, - ["projects", project_id, "search", workspace_id], - "query", - {query}, - %{namespace: namespace} - ) do - {:ok, matches} -> - json_response(req, %{"matches" => matches}) - end - end) + case Topical.execute( + Coflux.TopicalRegistry, + ["search", workspace_id], + "query", + {query}, + %{project: project_id} + ) do + {:ok, matches} -> + json_response(req, %{"matches" => matches}) + end end - defp handle(req, "GET", ["get_asset"], namespace) do + defp handle(req, "GET", ["get_asset"], project_id) do qs = :cowboy_req.parse_qs(req) - project_id = get_query_param(qs, "project") asset_id = get_query_param(qs, "asset") - with_project_access(req, project_id, namespace, fn -> - case Orchestration.get_asset_by_external_id(project_id, asset_id) do - {:error, :not_found} -> - json_error_response(req, "not_found", status: 404) + case Orchestration.get_asset_by_external_id(project_id, asset_id) do + {:error, :not_found} -> + json_error_response(req, "not_found", status: 404) - {:ok, name, entries} -> - json_response(req, compose_asset(name, entries)) - end - end) + {:ok, name, entries} -> + json_response(req, compose_asset(name, entries)) + end end - defp handle(req, "POST", ["create_session"], namespace) do + defp handle(req, "POST", ["create_session"], project_id) do {:ok, arguments, errors, req} = read_arguments( req, %{ - project_id: "projectId", workspace_name: "workspaceName" }, %{ @@ -644,28 +580,26 @@ defmodule Coflux.Handlers.Api do ) if Enum.empty?(errors) do - with_project_access(req, arguments.project_id, namespace, fn -> - opts = - [ - provides: arguments[:provides], - concurrency: arguments[:concurrency] - ] - |> Enum.reject(fn {_, v} -> is_nil(v) end) + opts = + [ + provides: arguments[:provides], + concurrency: arguments[:concurrency] + ] + |> Enum.reject(fn {_, v} -> is_nil(v) end) - case Orchestration.create_session(arguments.project_id, arguments.workspace_name, opts) do - {:ok, session_id} -> - json_response(req, %{"sessionId" => session_id}) + case Orchestration.create_session(project_id, arguments.workspace_name, opts) do + {:ok, session_id} -> + json_response(req, %{"sessionId" => session_id}) - {:error, :workspace_invalid} -> - json_error_response(req, "not_found", status: 404) - end - end) + {:error, :workspace_invalid} -> + json_error_response(req, "not_found", status: 404) + end else json_error_response(req, "bad_request", details: errors) end end - defp handle(req, _method, _path, _namespace) do + defp handle(req, _method, _path, _project) do json_error_response(req, "not_found", status: 404) end diff --git a/server/lib/coflux/handlers/auth.ex b/server/lib/coflux/handlers/auth.ex deleted file mode 100644 index 9335f6d5..00000000 --- a/server/lib/coflux/handlers/auth.ex +++ /dev/null @@ -1,108 +0,0 @@ -defmodule Coflux.Handlers.Auth do - @moduledoc """ - Handles token authentication for API and WebSocket requests. - - Tokens are stored in $COFLUX_DATA_DIR/tokens.json with format: - ```json - { - "": { - "namespaces": ["test", "acme", null] - } - } - ``` - - - Key: SHA-256 hash of token (hex, lowercase) - - namespaces: array of allowed namespaces (null = default namespace) - - If namespaces omitted, defaults to [null] - - Auth mode is controlled by COFLUX_AUTH_MODE: - - "none" (default): No authentication required - - "token": Require valid token with namespace access - """ - - alias Coflux.Config - alias Coflux.Auth.TokenStore - - @doc """ - Checks if the request is authorized for the given namespace. - Extracts token from Authorization header or query string. - - Returns `:ok` or `{:error, :unauthorized}`. - """ - def check(req, namespace) do - case Config.auth_mode() do - :none -> - :ok - - :token -> - case get_token(req) do - nil -> {:error, :unauthorized} - token -> check_token(token, namespace) - end - end - end - - @doc """ - Checks if the given token is authorized for the namespace. - - Returns `:ok` or `{:error, :unauthorized}`. - """ - def check_token(token, namespace) do - case Config.auth_mode() do - :none -> - :ok - - :token -> - validate_token(token, namespace) - end - end - - defp validate_token(nil, _namespace), do: {:error, :unauthorized} - - defp validate_token(token, namespace) do - token_hash = hash_token(token) - - case TokenStore.lookup(token_hash) do - {:ok, token_config} -> - if namespace in token_config.namespaces do - :ok - else - {:error, :unauthorized} - end - - :error -> - {:error, :unauthorized} - end - end - - defp get_token(req) do - get_bearer_token(req) || get_query_token(req) - end - - defp get_bearer_token(req) do - case :cowboy_req.header("authorization", req) do - :undefined -> - nil - - header -> - case String.split(header, " ", parts: 2) do - ["Bearer", token] -> String.trim(token) - _ -> nil - end - end - end - - defp get_query_token(req) do - qs = :cowboy_req.parse_qs(req) - - case List.keyfind(qs, "token", 0) do - {"token", token} when is_binary(token) and token != "" -> token - _ -> nil - end - end - - defp hash_token(token) do - :crypto.hash(:sha256, token) - |> Base.encode16(case: :lower) - end -end diff --git a/server/lib/coflux/handlers/topics.ex b/server/lib/coflux/handlers/topics.ex index 61d397f5..45f3179a 100644 --- a/server/lib/coflux/handlers/topics.ex +++ b/server/lib/coflux/handlers/topics.ex @@ -7,13 +7,15 @@ defmodule Coflux.Handlers.Topics do The client should request protocols like: ["bearer.dG9rZW4=", "v1"] The server echoes back "v1" on successful auth. + + The project is determined by COFLUX_PROJECT (if set) or extracted from the + subdomain (if COFLUX_BASE_DOMAIN is set). """ import Coflux.Handlers.Utils alias Topical.Adapters.Cowboy.WebsocketHandler, as: TopicalHandler - alias Coflux.Handlers.Auth - alias Coflux.Version + alias Coflux.{Auth, Version} @protocol_version "v1" @@ -22,9 +24,9 @@ defmodule Coflux.Handlers.Topics do expected_version = get_query_param(qs, "version") protocols = parse_websocket_protocols(req) - with {:ok, namespace} <- resolve_namespace(req), - {:ok, req} <- authenticate(req, protocols, namespace) do - opts = Keyword.put(opts, :init, fn _req -> {:ok, %{namespace: namespace}} end) + with {:ok, project_id} <- resolve_project(req), + {:ok, req} <- authenticate(req, protocols, project_id) do + opts = Keyword.put(opts, :init, fn _req -> {:ok, %{project: project_id}} end) case Version.check(expected_version) do :ok -> @@ -43,34 +45,44 @@ defmodule Coflux.Handlers.Topics do {:ok, req, opts} end else + {:error, :not_configured} -> + req = json_error_response(req, "not_configured", status: 500) + {:ok, req, opts} + {:error, :invalid_host} -> req = json_error_response(req, "invalid_host", status: 400) {:ok, req, opts} + {:error, :project_required} -> + req = json_error_response(req, "project_required", status: 400) + {:ok, req, opts} + {:error, :unauthorized} -> req = json_error_response(req, "unauthorized", status: 401) {:ok, req, opts} end end - defp authenticate(req, protocols, namespace) do - case extract_bearer_token(protocols) do - {:ok, token} -> - case Auth.check_token(token, namespace) do - :ok -> - req = :cowboy_req.set_resp_header("sec-websocket-protocol", @protocol_version, req) - {:ok, req} + defp authenticate(req, protocols, project_id) do + token = + case extract_bearer_token(protocols) do + {:ok, token} -> token + :none -> nil + end - error -> - error - end + case Auth.check(token, project_id) do + :ok -> + req = + if token do + :cowboy_req.set_resp_header("sec-websocket-protocol", @protocol_version, req) + else + req + end - :none -> - # No bearer token provided - check if auth is required - case Auth.check_token(nil, namespace) do - :ok -> {:ok, req} - error -> error - end + {:ok, req} + + error -> + error end end diff --git a/server/lib/coflux/handlers/utils.ex b/server/lib/coflux/handlers/utils.ex index f79cfc20..eb7d6285 100644 --- a/server/lib/coflux/handlers/utils.ex +++ b/server/lib/coflux/handlers/utils.ex @@ -2,26 +2,42 @@ defmodule Coflux.Handlers.Utils do alias Coflux.Config @doc """ - Resolves the namespace from the request hostname. + Resolves the project from the request. - Returns `{:ok, namespace}` where namespace is a string or nil, - or `{:error, :invalid_host}` if hostname doesn't match base domain. + Behavior depends on configuration: + + - Neither set: Returns `{:error, :not_configured}` + - COFLUX_PROJECT only: Returns the configured project (any access method works) + - COFLUX_BASE_DOMAIN only: Extracts project from subdomain (subdomain required) + - Both set: Extracts from subdomain, but must match COFLUX_PROJECT """ - def resolve_namespace(req) do - hostname = :cowboy_req.host(req) + def resolve_project(req) do + configured_project = Config.project() + base_domain = Config.base_domain() - case Config.base_domain() do - nil -> - {:ok, nil} + case {configured_project, base_domain} do + {nil, nil} -> + {:error, :not_configured} + + {project_id, nil} -> + {:ok, project_id} + + {_, base_domain} -> + # Subdomain routing + hostname = :cowboy_req.host(req) - base_domain -> cond do hostname == base_domain -> - {:ok, nil} + {:error, :project_required} String.ends_with?(hostname, "." <> base_domain) -> - namespace = String.replace_suffix(hostname, "." <> base_domain, "") - {:ok, namespace} + project_id = String.replace_suffix(hostname, "." <> base_domain, "") + + if configured_project && project_id != configured_project do + {:error, :project_mismatch} + else + {:ok, project_id} + end true -> {:error, :invalid_host} @@ -29,6 +45,24 @@ defmodule Coflux.Handlers.Utils do end end + @doc """ + Extracts a bearer token from the Authorization header. + + Returns the token string or nil if not found. + """ + def get_token(req) do + case :cowboy_req.header("authorization", req) do + :undefined -> + nil + + header -> + case String.split(header, " ", parts: 2) do + ["Bearer", token] -> String.trim(token) + _ -> nil + end + end + end + def set_cors_headers(req) do origin = :cowboy_req.header("origin", req, nil) allowed_origin = get_allowed_origin(origin) diff --git a/server/lib/coflux/handlers/worker.ex b/server/lib/coflux/handlers/worker.ex index 26054108..e5de6d24 100644 --- a/server/lib/coflux/handlers/worker.ex +++ b/server/lib/coflux/handlers/worker.ex @@ -7,11 +7,14 @@ defmodule Coflux.Handlers.Worker do The client should request protocols like: ["session.dG9rZW4=", "v1"] The server echoes back "v1" on successful auth. + + The project is determined by COFLUX_PROJECT (if set) or extracted from the + subdomain (if COFLUX_BASE_DOMAIN is set). """ import Coflux.Handlers.Utils - alias Coflux.{Orchestration, Projects, Version} + alias Coflux.{Orchestration, Config, ProjectStore, Version} @protocol_version "v1" @@ -22,13 +25,26 @@ defmodule Coflux.Handlers.Worker do case Version.check(expected_version) do :ok -> - # TODO: validate - project_id = get_query_param(qs, "project") - workspace_name = get_query_param(qs, "workspace") - session_token = extract_session_token(protocols) + case resolve_project(req) do + {:ok, project_id} -> + workspace_name = get_query_param(qs, "workspace") + session_token = extract_session_token(protocols) + + req = :cowboy_req.set_resp_header("sec-websocket-protocol", @protocol_version, req) + {:cowboy_websocket, req, {project_id, workspace_name, session_token}} + + {:error, :not_configured} -> + req = json_error_response(req, "not_configured", status: 500) + {:ok, req, opts} - req = :cowboy_req.set_resp_header("sec-websocket-protocol", @protocol_version, req) - {:cowboy_websocket, req, {project_id, workspace_name, session_token}} + {:error, :project_required} -> + req = json_error_response(req, "project_required", status: 400) + {:ok, req, opts} + + {:error, :invalid_host} -> + req = json_error_response(req, "invalid_host", status: 400) + {:ok, req, opts} + end {:error, server_version, expected_version} -> req = @@ -45,27 +61,33 @@ defmodule Coflux.Handlers.Worker do end def websocket_init({project_id, workspace_name, session_token}) do - case Projects.get_project_by_id(Coflux.ProjectsServer, project_id) do - {:ok, _} -> - # TODO: monitor server? - case Orchestration.resume_session(project_id, session_token, workspace_name, self()) do - {:ok, external_id, execution_ids} -> - {[session_message(external_id)], - %{ - project_id: project_id, - session_id: external_id, - execution_ids: execution_ids - }} - - {:error, :session_invalid} -> - {[{:close, 4000, "session_invalid"}], nil} - - {:error, :workspace_mismatch} -> - {[{:close, 4000, "workspace_mismatch"}], nil} - end - - :error -> - {[{:close, 4000, "project_not_found"}], nil} + # Validate project against whitelist when using subdomain routing + project_valid = + if Config.base_domain() do + ProjectStore.exists?(project_id) + else + true + end + + if project_valid do + # TODO: monitor server? + case Orchestration.resume_session(project_id, session_token, workspace_name, self()) do + {:ok, external_id, execution_ids} -> + {[session_message(external_id)], + %{ + project_id: project_id, + session_id: external_id, + execution_ids: execution_ids + }} + + {:error, :session_invalid} -> + {[{:close, 4000, "session_invalid"}], nil} + + {:error, :workspace_mismatch} -> + {[{:close, 4000, "workspace_mismatch"}], nil} + end + else + {[{:close, 4000, "project_not_found"}], nil} end end diff --git a/server/lib/coflux/project_store.ex b/server/lib/coflux/project_store.ex new file mode 100644 index 00000000..d3f115b6 --- /dev/null +++ b/server/lib/coflux/project_store.ex @@ -0,0 +1,76 @@ +defmodule Coflux.ProjectStore do + @moduledoc """ + GenServer that owns the project whitelist ETS table. + + Only active when COFLUX_BASE_DOMAIN is configured (subdomain routing). + Projects are loaded from $COFLUX_DATA_DIR/projects.json at startup. + Reads go directly to ETS for performance. + + Project file format: + ```json + { + "acme": {}, + "demo": {} + } + ``` + + - Key: project name (string) + - Value: reserved for future configuration (currently unused) + """ + + use GenServer + + alias Coflux.Utils + + @table :coflux_projects + + # Client API + + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + @doc """ + Checks if a project exists in the whitelist. Reads directly from ETS. + + Returns `true` if project is whitelisted, `false` otherwise. + """ + def exists?(project) do + case :ets.lookup(@table, project) do + [{^project, _config}] -> true + [] -> false + end + end + + # Server callbacks + + @impl true + def init(_opts) do + table = :ets.new(@table, [:named_table, :public, :set, read_concurrency: true]) + + for {project, config} <- load_projects() do + :ets.insert(@table, {project, config}) + end + + {:ok, %{table: table}} + end + + defp load_projects do + path = Utils.data_path("projects.json") + + if File.exists?(path) do + path + |> File.read!() + |> Jason.decode!() + |> Map.new(fn + {project_name, config} when is_map(config) -> + {project_name, config} + + {project_name, _} -> + {project_name, %{}} + end) + else + %{} + end + end +end diff --git a/server/lib/coflux/projects.ex b/server/lib/coflux/projects.ex deleted file mode 100644 index 5959559f..00000000 --- a/server/lib/coflux/projects.ex +++ /dev/null @@ -1,184 +0,0 @@ -defmodule Coflux.Projects do - use GenServer - - alias Coflux.Utils - - def start_link(opts) do - GenServer.start_link(__MODULE__, {}, opts) - end - - def create_project(server, project_name, namespace \\ nil) do - GenServer.call(server, {:create_project, project_name, namespace}) - end - - @doc """ - Gets a project by ID, optionally validating namespace access. - - Returns: - - {:ok, project} if project exists and namespace matches (if provided) - - :error if project doesn't exist or belongs to a different namespace - """ - def get_project_by_id(server, project_id, namespace \\ nil) do - GenServer.call(server, {:get_project_by_id, project_id, namespace}) - end - - def subscribe(server, pid, namespace \\ nil) do - GenServer.call(server, {:subscribe, pid, namespace}) - end - - def unsubscribe(server, ref) do - GenServer.cast(server, {:unsubscribe, ref}) - end - - def init({}) do - path = get_path() - - projects = - if File.exists?(path) do - path - |> File.read!() - |> Jason.decode!() - |> Map.new(fn {project_id, project} -> - {project_id, build_project(project)} - end) - else - %{} - end - - {:ok, %{projects: projects, subscribers: %{}}} - end - - def handle_call({:create_project, project_name, namespace}, _from, state) do - existing_project_names = - state.projects - |> Map.values() - |> Enum.filter(&(&1.namespace == namespace)) - |> MapSet.new(& &1.name) - - errors = - Map.reject( - %{ - project_name: validate_project_name(project_name, existing_project_names) - }, - fn {_key, value} -> value == :ok end - ) - - if Enum.any?(errors) do - {:reply, {:error, errors}, state} - else - project_id = generate_id(state) - - state = - put_in( - state.projects[project_id], - %{name: project_name, namespace: namespace} - ) - - save_projects(state) - notify_subscribers(state, project_id) - {:reply, {:ok, project_id}, state} - end - end - - def handle_call({:get_project_by_id, project_id, namespace}, _from, state) do - result = - case Map.fetch(state.projects, project_id) do - {:ok, project} -> - if project.namespace == namespace do - {:ok, project} - else - :error - end - - :error -> - :error - end - - {:reply, result, state} - end - - def handle_call({:subscribe, pid, namespace}, _from, state) do - ref = Process.monitor(pid) - state = put_in(state.subscribers[ref], {pid, namespace}) - - filtered_projects = - state.projects - |> Enum.filter(fn {_id, project} -> project.namespace == namespace end) - |> Map.new() - - {:reply, {ref, filtered_projects}, state} - end - - def handle_cast({:unsubscribe, ref}, state) do - state = remove_subscriber(state, ref) - {:noreply, state} - end - - def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do - state = remove_subscriber(state, ref) - {:noreply, state} - end - - defp get_path() do - Utils.data_path("projects.json") - end - - defp remove_subscriber(state, ref) do - Map.update!(state, :subscribers, &Map.delete(&1, ref)) - end - - defp notify_subscribers(state, project_id) do - project = Map.fetch!(state.projects, project_id) - - Enum.each(state.subscribers, fn {ref, {pid, namespace}} -> - if project.namespace == namespace do - send(pid, {:project, ref, project_id, project}) - end - end) - end - - defp save_projects(state) do - projects_for_json = - Map.new(state.projects, fn {id, project} -> - json_project = - if project.namespace do - %{"name" => project.name, "namespace" => project.namespace} - else - %{"name" => project.name} - end - - {id, json_project} - end) - - content = Jason.encode!(projects_for_json) - path = get_path() - :ok = File.mkdir_p!(Path.dirname(path)) - File.write!(path, content) - end - - def generate_id(state, length \\ 5) do - id = Utils.generate_id(length, "p") - - if Map.has_key?(state.projects, id) do - generate_id(state, length + 1) - else - id - end - end - - defp build_project(project) do - %{ - name: Map.fetch!(project, "name"), - namespace: Map.get(project, "namespace") - } - end - - defp validate_project_name(name, existing) do - cond do - # TODO: stricter validation? - !name -> :invalid - Enum.member?(existing, name) -> :exists - true -> :ok - end - end -end diff --git a/server/lib/coflux/topic_utils.ex b/server/lib/coflux/topic_utils.ex index ab29c061..58ec81f8 100644 --- a/server/lib/coflux/topic_utils.ex +++ b/server/lib/coflux/topic_utils.ex @@ -1,15 +1,4 @@ defmodule Coflux.TopicUtils do - alias Coflux.Projects - - @projects_server Coflux.ProjectsServer - - def validate_project_access(project_id, namespace) do - case Projects.get_project_by_id(@projects_server, project_id, namespace) do - {:ok, _project} -> :ok - :error -> {:error, :not_found} - end - end - def build_value(value) do case value do {:raw, data, references} -> diff --git a/server/lib/coflux/topics/logs.ex b/server/lib/coflux/topics/logs.ex index 88833b85..d206ee13 100644 --- a/server/lib/coflux/topics/logs.ex +++ b/server/lib/coflux/topics/logs.ex @@ -1,20 +1,16 @@ defmodule Coflux.Topics.Logs do - use Topical.Topic, route: ["projects", :project_id, "runs", :run_id, "logs", :workspace_id] + use Topical.Topic, route: ["runs", :run_id, "logs", :workspace_id] alias Coflux.Orchestration import Coflux.TopicUtils def connect(params, context) do - namespace = Map.get(context, :namespace) - - with :ok <- validate_project_access(params.project_id, namespace) do - {:ok, params} - end + {:ok, Map.put(params, :project, context.project)} end def init(params) do - project_id = Map.fetch!(params, :project_id) + project_id = Map.fetch!(params, :project) run_id = Map.fetch!(params, :run_id) workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) diff --git a/server/lib/coflux/topics/module.ex b/server/lib/coflux/topics/module.ex index 4f342520..a9f5cc6c 100644 --- a/server/lib/coflux/topics/module.ex +++ b/server/lib/coflux/topics/module.ex @@ -1,21 +1,15 @@ defmodule Coflux.Topics.Module do use Topical.Topic, - route: ["projects", :project_id, "modules", :module, :workspace_id] + route: ["modules", :module, :workspace_id] alias Coflux.Orchestration - import Coflux.TopicUtils, only: [validate_project_access: 2] - def connect(params, context) do - namespace = Map.get(context, :namespace) - - with :ok <- validate_project_access(params.project_id, namespace) do - {:ok, params} - end + {:ok, Map.put(params, :project, context.project)} end def init(params) do - project_id = Map.fetch!(params, :project_id) + project_id = Map.fetch!(params, :project) module = Map.fetch!(params, :module) workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) diff --git a/server/lib/coflux/topics/modules.ex b/server/lib/coflux/topics/modules.ex index 0ff7f11a..2cdc3ec7 100644 --- a/server/lib/coflux/topics/modules.ex +++ b/server/lib/coflux/topics/modules.ex @@ -1,20 +1,14 @@ defmodule Coflux.Topics.Modules do - use Topical.Topic, route: ["projects", :project_id, "modules", :workspace_id] + use Topical.Topic, route: ["modules", :workspace_id] alias Coflux.Orchestration - import Coflux.TopicUtils, only: [validate_project_access: 2] - def connect(params, context) do - namespace = Map.get(context, :namespace) - - with :ok <- validate_project_access(params.project_id, namespace) do - {:ok, params} - end + {:ok, Map.put(params, :project, context.project)} end def init(params) do - project_id = Map.fetch!(params, :project_id) + project_id = Map.fetch!(params, :project) workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) {:ok, manifests, executions, ref} = diff --git a/server/lib/coflux/topics/pool.ex b/server/lib/coflux/topics/pool.ex index 1ec22955..36ec22f9 100644 --- a/server/lib/coflux/topics/pool.ex +++ b/server/lib/coflux/topics/pool.ex @@ -1,20 +1,14 @@ defmodule Coflux.Topics.Pool do - use Topical.Topic, route: ["projects", :project_id, "pools", :workspace_id, :pool_name] + use Topical.Topic, route: ["pools", :workspace_id, :pool_name] alias Coflux.Orchestration - import Coflux.TopicUtils, only: [validate_project_access: 2] - def connect(params, context) do - namespace = Map.get(context, :namespace) - - with :ok <- validate_project_access(params.project_id, namespace) do - {:ok, params} - end + {:ok, Map.put(params, :project, context.project)} end def init(params) do - project_id = Map.fetch!(params, :project_id) + project_id = Map.fetch!(params, :project) workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) pool_name = Map.fetch!(params, :pool_name) diff --git a/server/lib/coflux/topics/pools.ex b/server/lib/coflux/topics/pools.ex index f5355e58..10e51773 100644 --- a/server/lib/coflux/topics/pools.ex +++ b/server/lib/coflux/topics/pools.ex @@ -1,20 +1,14 @@ defmodule Coflux.Topics.Pools do - use Topical.Topic, route: ["projects", :project_id, "pools", :workspace_id] + use Topical.Topic, route: ["pools", :workspace_id] alias Coflux.Orchestration - import Coflux.TopicUtils, only: [validate_project_access: 2] - def connect(params, context) do - namespace = Map.get(context, :namespace) - - with :ok <- validate_project_access(params.project_id, namespace) do - {:ok, params} - end + {:ok, Map.put(params, :project, context.project)} end def init(params) do - project_id = Map.fetch!(params, :project_id) + project_id = Map.fetch!(params, :project) workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) {:ok, pools, ref} = diff --git a/server/lib/coflux/topics/projects.ex b/server/lib/coflux/topics/projects.ex deleted file mode 100644 index 1d65db3e..00000000 --- a/server/lib/coflux/topics/projects.ex +++ /dev/null @@ -1,23 +0,0 @@ -defmodule Coflux.Topics.Projects do - alias Coflux.Projects - - use Topical.Topic, route: ["projects"] - - @server Coflux.ProjectsServer - - def connect(params, context) do - namespace = Map.get(context, :namespace) - {:ok, Map.put(params, :namespace, namespace)} - end - - def init(params) do - namespace = Map.get(params, :namespace) - {ref, projects} = Projects.subscribe(@server, self(), namespace) - {:ok, Topic.new(projects, %{ref: ref})} - end - - def handle_info({:project, _ref, project_id, project}, topic) do - topic = Topic.set(topic, [project_id], project) - {:ok, topic} - end -end diff --git a/server/lib/coflux/topics/run.ex b/server/lib/coflux/topics/run.ex index a5b15ba4..caa11241 100644 --- a/server/lib/coflux/topics/run.ex +++ b/server/lib/coflux/topics/run.ex @@ -1,20 +1,16 @@ defmodule Coflux.Topics.Run do - use Topical.Topic, route: ["projects", :project_id, "runs", :run_id, :workspace_id] + use Topical.Topic, route: ["runs", :run_id, :workspace_id] alias Coflux.Orchestration import Coflux.TopicUtils def connect(params, context) do - namespace = Map.get(context, :namespace) - - with :ok <- validate_project_access(params.project_id, namespace) do - {:ok, params} - end + {:ok, Map.put(params, :project, context.project)} end def init(params) do - project_id = Map.fetch!(params, :project_id) + project_id = Map.fetch!(params, :project) external_run_id = Map.fetch!(params, :run_id) workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) @@ -41,7 +37,7 @@ defmodule Coflux.Topics.Run do {:ok, Topic.new(build_run(run, parent, steps, workspace_ids), %{ - project_id: project_id, + project: project_id, external_run_id: external_run_id, workspace_ids: workspace_ids })} diff --git a/server/lib/coflux/topics/search.ex b/server/lib/coflux/topics/search.ex index 68cd1f5c..d5e8a98b 100644 --- a/server/lib/coflux/topics/search.ex +++ b/server/lib/coflux/topics/search.ex @@ -1,20 +1,14 @@ defmodule Coflux.Topics.Search do alias Coflux.Orchestration - use Topical.Topic, route: ["projects", :project_id, "search", :workspace_id] - - import Coflux.TopicUtils, only: [validate_project_access: 2] + use Topical.Topic, route: ["search", :workspace_id] def connect(params, context) do - namespace = Map.get(context, :namespace) - - with :ok <- validate_project_access(params.project_id, namespace) do - {:ok, params} - end + {:ok, Map.put(params, :project, context.project)} end def init(params) do - project_id = Map.fetch!(params, :project_id) + project_id = Map.fetch!(params, :project) workspace_id = Map.fetch!(params, :workspace_id) case Orchestration.subscribe_targets(project_id, workspace_id, self()) do diff --git a/server/lib/coflux/topics/sessions.ex b/server/lib/coflux/topics/sessions.ex index 2bd61eb8..0acf15ad 100644 --- a/server/lib/coflux/topics/sessions.ex +++ b/server/lib/coflux/topics/sessions.ex @@ -1,20 +1,14 @@ defmodule Coflux.Topics.Sessions do - use Topical.Topic, route: ["projects", :project_id, "sessions", :workspace_id] + use Topical.Topic, route: ["sessions", :workspace_id] alias Coflux.Orchestration - import Coflux.TopicUtils, only: [validate_project_access: 2] - def connect(params, context) do - namespace = Map.get(context, :namespace) - - with :ok <- validate_project_access(params.project_id, namespace) do - {:ok, params} - end + {:ok, Map.put(params, :project, context.project)} end def init(params) do - project_id = Map.fetch!(params, :project_id) + project_id = Map.fetch!(params, :project) workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) {:ok, sessions, ref} = Orchestration.subscribe_sessions(project_id, workspace_id, self()) diff --git a/server/lib/coflux/topics/workflow.ex b/server/lib/coflux/topics/workflow.ex index bd46bd7c..1df2f779 100644 --- a/server/lib/coflux/topics/workflow.ex +++ b/server/lib/coflux/topics/workflow.ex @@ -1,21 +1,15 @@ defmodule Coflux.Topics.Workflow do use Topical.Topic, - route: ["projects", :project_id, "workflows", :module, :target, :workspace_id] + route: ["workflows", :module, :target, :workspace_id] alias Coflux.Orchestration - import Coflux.TopicUtils, only: [validate_project_access: 2] - def connect(params, context) do - namespace = Map.get(context, :namespace) - - with :ok <- validate_project_access(params.project_id, namespace) do - {:ok, params} - end + {:ok, Map.put(params, :project, context.project)} end def init(params) do - project_id = Map.fetch!(params, :project_id) + project_id = Map.fetch!(params, :project) module = Map.fetch!(params, :module) target_name = Map.fetch!(params, :target) workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) diff --git a/server/lib/coflux/topics/workspaces.ex b/server/lib/coflux/topics/workspaces.ex index 464dc9b8..5d8216c4 100644 --- a/server/lib/coflux/topics/workspaces.ex +++ b/server/lib/coflux/topics/workspaces.ex @@ -1,20 +1,14 @@ defmodule Coflux.Topics.Workspaces do - use Topical.Topic, route: ["projects", :project_id, "workspaces"] + use Topical.Topic, route: ["workspaces"] alias Coflux.Orchestration - import Coflux.TopicUtils, only: [validate_project_access: 2] - def connect(params, context) do - namespace = Map.get(context, :namespace) - - with :ok <- validate_project_access(params.project_id, namespace) do - {:ok, params} - end + {:ok, Map.put(params, :project, context.project)} end def init(params) do - project_id = Map.fetch!(params, :project_id) + project_id = Map.fetch!(params, :project) {:ok, workspaces, ref} = Orchestration.subscribe_workspaces(project_id, self()) workspaces = From 0c6362973bc528339c7e4087816e8cfa9a29dfaf Mon Sep 17 00:00:00 2001 From: Joe Freeman Date: Fri, 30 Jan 2026 17:47:31 +0000 Subject: [PATCH 2/2] Move tokens to projects file --- server/lib/coflux/application.ex | 6 +- server/lib/coflux/auth.ex | 62 +++++++++++---------- server/lib/coflux/auth/token_store.ex | 79 --------------------------- server/lib/coflux/config.ex | 16 ------ server/lib/coflux/project_store.ex | 64 +++++++++++++++++----- 5 files changed, 84 insertions(+), 143 deletions(-) delete mode 100644 server/lib/coflux/auth/token_store.ex diff --git a/server/lib/coflux/application.ex b/server/lib/coflux/application.ex index 0527d0c3..7d1a498c 100644 --- a/server/lib/coflux/application.ex +++ b/server/lib/coflux/application.ex @@ -2,7 +2,6 @@ defmodule Coflux.Application do use Application alias Coflux.{Config, ProjectStore, Orchestration, Topics} - alias Coflux.Auth.TokenStore @impl true def start(_type, _args) do @@ -12,16 +11,13 @@ defmodule Coflux.Application do children = [ - TokenStore, - # ProjectStore only needed for subdomain routing (COFLUX_BASE_DOMAIN set) - if(Config.base_domain(), do: ProjectStore), + ProjectStore, # TODO: separate launch supervisor per project? (and specify max_children?) {Task.Supervisor, name: Coflux.LauncherSupervisor}, Orchestration.Supervisor, {Topical, name: Coflux.TopicalRegistry, topics: topics()}, {Coflux.Web, port: port} ] - |> Enum.filter(& &1) opts = [strategy: :one_for_one, name: Coflux.Supervisor] diff --git a/server/lib/coflux/auth.ex b/server/lib/coflux/auth.ex index 6eb56d20..e78e2a39 100644 --- a/server/lib/coflux/auth.ex +++ b/server/lib/coflux/auth.ex @@ -2,55 +2,59 @@ defmodule Coflux.Auth do @moduledoc """ Handles token authentication. - Tokens are stored in $COFLUX_DATA_DIR/tokens.json with format: + Token access is configured per-project in $COFLUX_DATA_DIR/projects.json: ```json { - "": { - "projects": ["acme", "demo"] - } + "acme": { + "tokens": ["", ""] + }, + "demo": {} } ``` - - Key: SHA-256 hash of token (hex, lowercase) - - projects: array of allowed project names (empty array = all projects) - - Auth mode is controlled by COFLUX_AUTH_MODE: - - "none" (default): No authentication required - - "token": Require valid token with project access + The `tokens` field controls access: + - missing/null: open access (no token required) + - [] (empty array): locked (no valid tokens, access denied) + - ["hash1", ...]: restricted to listed token hashes """ - alias Coflux.Config - alias Coflux.Auth.TokenStore + alias Coflux.ProjectStore @doc """ Checks if the given token is authorized for the project. - Returns `:ok` when auth is disabled or token is valid. + Returns `:ok` when access is allowed. Returns `{:error, :unauthorized}` otherwise. """ def check(token, project_id) do - case Config.auth_mode() do - :none -> :ok - :token -> validate_token(token, project_id) + case ProjectStore.get_tokens(project_id) do + {:ok, nil} -> + # No token auth configured - open access + :ok + + {:ok, []} -> + # Empty tokens list - locked, no valid tokens + {:error, :unauthorized} + + {:ok, tokens} -> + # Token list configured - must provide valid token + validate_token(token, tokens) + + :error -> + # Project not found (shouldn't happen if validate_project passed) + {:error, :unauthorized} end end - defp validate_token(nil, _project_id), do: {:error, :unauthorized} + defp validate_token(nil, _tokens), do: {:error, :unauthorized} - defp validate_token(token, project_id) do + defp validate_token(token, tokens) do token_hash = hash_token(token) - case TokenStore.lookup(token_hash) do - {:ok, token_config} -> - # Empty projects list means access to all projects - if token_config.projects == [] or project_id in token_config.projects do - :ok - else - {:error, :unauthorized} - end - - :error -> - {:error, :unauthorized} + if token_hash in tokens do + :ok + else + {:error, :unauthorized} end end diff --git a/server/lib/coflux/auth/token_store.ex b/server/lib/coflux/auth/token_store.ex deleted file mode 100644 index c2c3b7e8..00000000 --- a/server/lib/coflux/auth/token_store.ex +++ /dev/null @@ -1,79 +0,0 @@ -defmodule Coflux.Auth.TokenStore do - @moduledoc """ - GenServer that owns the auth tokens ETS table. - - Tokens are loaded from $COFLUX_DATA_DIR/tokens.json at startup. - Reads go directly to ETS for performance. - - Token file format: - ```json - { - "": { - "projects": ["acme", "demo"] - } - } - ``` - - - Key: SHA-256 hash of token (hex, lowercase) - - projects: array of allowed project names (empty array = all projects) - """ - - use GenServer - - defmodule TokenConfig do - defstruct projects: [] - end - - @table :coflux_auth_tokens - - alias Coflux.Utils - - # Client API - - def start_link(opts \\ []) do - GenServer.start_link(__MODULE__, opts, name: __MODULE__) - end - - @doc """ - Looks up a token by its hash. Reads directly from ETS. - - Returns `{:ok, %TokenConfig{}}` if found, `:error` otherwise. - """ - def lookup(token_hash) do - case :ets.lookup(@table, token_hash) do - [{^token_hash, config}] -> {:ok, config} - [] -> :error - end - end - - # Server callbacks - - @impl true - def init(_opts) do - table = :ets.new(@table, [:named_table, :public, :set, read_concurrency: true]) - - for {hash, config} <- load_tokens() do - :ets.insert(@table, {hash, config}) - end - - {:ok, %{table: table}} - end - - defp load_tokens do - path = Utils.data_path("tokens.json") - - if File.exists?(path) do - path - |> File.read!() - |> Jason.decode!() - |> Map.new(fn {hash, config} -> {hash, parse_config(config)} end) - else - %{} - end - end - - defp parse_config(config) do - projects = Map.get(config, "projects", []) - %TokenConfig{projects: projects} - end -end diff --git a/server/lib/coflux/config.ex b/server/lib/coflux/config.ex index d3991068..bea4adc4 100644 --- a/server/lib/coflux/config.ex +++ b/server/lib/coflux/config.ex @@ -24,7 +24,6 @@ defmodule Coflux.Config do """ def init do :persistent_term.put(:coflux_data_dir, parse_data_dir()) - :persistent_term.put(:coflux_auth_mode, parse_auth_mode()) :persistent_term.put(:coflux_project, System.get_env("COFLUX_PROJECT")) :persistent_term.put(:coflux_base_domain, System.get_env("COFLUX_BASE_DOMAIN")) :persistent_term.put(:coflux_allowed_origins, parse_allowed_origins()) @@ -38,13 +37,6 @@ defmodule Coflux.Config do :persistent_term.get(:coflux_data_dir) end - @doc """ - Returns the auth mode: `:none` (default) or `:token`. - """ - def auth_mode do - :persistent_term.get(:coflux_auth_mode) - end - @doc """ Returns the configured project name, or nil if not set. @@ -74,14 +66,6 @@ defmodule Coflux.Config do System.get_env("COFLUX_DATA_DIR", Path.join(File.cwd!(), "data")) end - defp parse_auth_mode do - case System.get_env("COFLUX_AUTH_MODE", "none") do - "none" -> :none - "token" -> :token - other -> raise "Invalid COFLUX_AUTH_MODE: #{inspect(other)}. Must be \"none\" or \"token\"." - end - end - @default_allowed_origins ["https://studio.coflux.com"] defp parse_allowed_origins do diff --git a/server/lib/coflux/project_store.ex b/server/lib/coflux/project_store.ex index d3f115b6..a64c0a83 100644 --- a/server/lib/coflux/project_store.ex +++ b/server/lib/coflux/project_store.ex @@ -1,26 +1,31 @@ defmodule Coflux.ProjectStore do @moduledoc """ - GenServer that owns the project whitelist ETS table. + GenServer that owns the project configuration ETS table. - Only active when COFLUX_BASE_DOMAIN is configured (subdomain routing). Projects are loaded from $COFLUX_DATA_DIR/projects.json at startup. + If COFLUX_PROJECT is set, that project is added automatically. Reads go directly to ETS for performance. Project file format: ```json { - "acme": {}, + "acme": { + "tokens": ["", ""] + }, "demo": {} } ``` - Key: project name (string) - - Value: reserved for future configuration (currently unused) + - tokens: controls access to the project: + - missing/null: open access (no token auth) + - [] (empty array): locked (no valid tokens) + - ["hash1", ...]: restricted to listed token hashes (SHA-256, hex, lowercase) """ use GenServer - alias Coflux.Utils + alias Coflux.{Config, Utils} @table :coflux_projects @@ -31,9 +36,9 @@ defmodule Coflux.ProjectStore do end @doc """ - Checks if a project exists in the whitelist. Reads directly from ETS. + Checks if a project exists. Reads directly from ETS. - Returns `true` if project is whitelisted, `false` otherwise. + Returns `true` if project exists, `false` otherwise. """ def exists?(project) do case :ets.lookup(@table, project) do @@ -42,13 +47,34 @@ defmodule Coflux.ProjectStore do end end + @doc """ + Gets the list of valid token hashes for a project. Reads directly from ETS. + + Returns `{:ok, [hash1, hash2, ...]}` if project exists, `:error` otherwise. + """ + def get_tokens(project) do + case :ets.lookup(@table, project) do + [{^project, config}] -> {:ok, config.tokens} + [] -> :error + end + end + # Server callbacks @impl true def init(_opts) do table = :ets.new(@table, [:named_table, :public, :set, read_concurrency: true]) - for {project, config} <- load_projects() do + projects = load_projects() + + # Add COFLUX_PROJECT if set and not already in file (with open access) + projects = + case Config.project() do + nil -> projects + project_id -> Map.put_new(projects, project_id, %{tokens: nil}) + end + + for {project, config} <- projects do :ets.insert(@table, {project, config}) end @@ -62,15 +88,25 @@ defmodule Coflux.ProjectStore do path |> File.read!() |> Jason.decode!() - |> Map.new(fn - {project_name, config} when is_map(config) -> - {project_name, config} - - {project_name, _} -> - {project_name, %{}} + |> Map.new(fn {project_name, config} -> + {project_name, parse_config(config)} end) else %{} end end + + defp parse_config(config) when is_map(config) do + # nil/missing = open access, [] = locked, [...] = restricted + tokens = + case Map.get(config, "tokens") do + nil -> nil + list when is_list(list) -> list + _ -> nil + end + + %{tokens: tokens} + end + + defp parse_config(_), do: %{tokens: nil} end