diff --git a/clients/python/coflux/__main__.py b/clients/python/coflux/__main__.py index 24c484c4..40a1449a 100644 --- a/clients/python/coflux/__main__.py +++ b/clients/python/coflux/__main__.py @@ -276,6 +276,7 @@ def _init( serialiser_configs: list[config.SerialiserConfig], blob_threshold: int, blob_store_configs: list[config.BlobStoreConfig], + log_store_config: config.LogStoreConfig, concurrency: int, session_id: str | None, register: bool, @@ -310,6 +311,7 @@ def _init( serialiser_configs, blob_threshold, blob_store_configs, + log_store_config, session_id, targets, ) as worker: @@ -822,7 +824,7 @@ def blobs_get(host: str, secure: bool | None, key: str): raise click.ClickException("Blob store not configured") out = click.get_binary_stream("stdout") - with BlobManager(config.blobs.stores, host, secure=use_secure) as blob_manager: + with BlobManager(config.blobs.stores, server_host=host, server_secure=use_secure) as blob_manager: blob = blob_manager.get(key) for chunk in iter(lambda: blob.read(64 * 1024), b""): out.write(chunk) @@ -965,7 +967,7 @@ def assets_download( total_size = sum(v["size"] for v in entries.values()) - with BlobManager(config.blobs.stores, host, secure=use_secure) as blob_manager: + with BlobManager(config.blobs.stores, server_host=host, server_secure=use_secure) as blob_manager: click.echo(f"Downloading {len(entries)} files ({_human_size(total_size)})...") # TODO: parallelise downloads with click.progressbar(entries.items(), label="") as bar: @@ -1076,6 +1078,7 @@ def worker( "serialiser_configs": config and config.serialisers, "blob_threshold": config and config.blobs and config.blobs.threshold, "blob_store_configs": config and config.blobs and config.blobs.stores, + "log_store_config": config and config.logs and config.logs.store, "concurrency": concurrency, "session_id": session, "register": register or dev, diff --git a/clients/python/coflux/blobs.py b/clients/python/coflux/blobs.py index 438f54c3..4cd49ed9 100644 --- a/clients/python/coflux/blobs.py +++ b/clients/python/coflux/blobs.py @@ -50,8 +50,8 @@ def upload(self, path: Path) -> str: class HttpStore(Store): - def __init__(self, protocol: t.Literal["http", "https"], host: str): - self._protocol = protocol + def __init__(self, host: str, *, secure: bool = False): + self._protocol = "https" if secure else "http" self._host = host def __enter__(self): @@ -161,11 +161,11 @@ def upload(self, path: Path) -> str: return self.put(file) -def _create(config_: config.BlobStoreConfig, server_host: str, secure: bool): +def _create(config_: config.BlobStoreConfig, server_host: str, server_secure: bool): if config_.type == "http": - # Use secure setting if config uses default host, otherwise respect config's protocol - protocol = ("https" if secure else "http") if config_.host is None else config_.protocol - return HttpStore(protocol, config_.host or server_host) + host = config_.host if config_.host is not None else server_host + secure = config_.secure if config_.secure is not None else server_secure + return HttpStore(host, secure=secure) elif config_.type == "s3": return S3Store(config_.bucket, config_.prefix, config_.region) else: @@ -173,8 +173,8 @@ def _create(config_: config.BlobStoreConfig, server_host: str, secure: bool): class Manager: - def __init__(self, store_configs: list[config.BlobStoreConfig], server_host: str, *, secure: bool): - self._stores = [_create(c, server_host, secure) for c in store_configs] + def __init__(self, store_configs: list[config.BlobStoreConfig], *, server_host: str, server_secure: bool): + self._stores = [_create(c, server_host, server_secure) for c in store_configs] def __enter__(self): # TODO: ? diff --git a/clients/python/coflux/config.py b/clients/python/coflux/config.py index 8d6d475c..3926c996 100644 --- a/clients/python/coflux/config.py +++ b/clients/python/coflux/config.py @@ -12,8 +12,8 @@ class ServerConfig(pydantic.BaseModel): class HTTPBlobStoreConfig(pydantic.BaseModel): type: t.Literal["http"] = "http" - protocol: t.Literal["http", "https"] = "http" host: str | None = None + secure: bool | None = None class S3BlobStoreConfig(pydantic.BaseModel): @@ -29,7 +29,7 @@ class S3BlobStoreConfig(pydantic.BaseModel): ] -def _default_blob_stores(): +def _default_blob_stores() -> list[BlobStoreConfig]: return [HTTPBlobStoreConfig()] @@ -38,6 +38,32 @@ class BlobsConfig(pydantic.BaseModel): stores: list[BlobStoreConfig] = pydantic.Field(default_factory=_default_blob_stores) +class HTTPLogStoreConfig(pydantic.BaseModel): + """HTTP-based log store that POSTs logs to the Coflux server.""" + + type: t.Literal["http"] = "http" + host: str | None = None + secure: bool | None = None + batch_size: int = 100 + flush_interval: float = 0.5 + + +LogStoreConfig = t.Annotated[ + HTTPLogStoreConfig, + pydantic.Field(discriminator="type"), +] + + +def _default_log_store(): + return HTTPLogStoreConfig() + + +class LogsConfig(pydantic.BaseModel): + """Configuration for log storage.""" + + store: LogStoreConfig = pydantic.Field(default_factory=_default_log_store) + + class PandasSerialiserConfig(pydantic.BaseModel): type: t.Literal["pandas"] = "pandas" @@ -78,6 +104,7 @@ class Config(pydantic.BaseModel): default_factory=_default_serialisers ) blobs: BlobsConfig = pydantic.Field(default_factory=BlobsConfig) + logs: LogsConfig = pydantic.Field(default_factory=LogsConfig) class DockerLauncherConfig(pydantic.BaseModel): diff --git a/clients/python/coflux/execution.py b/clients/python/coflux/execution.py index 66e7e250..2003a197 100644 --- a/clients/python/coflux/execution.py +++ b/clients/python/coflux/execution.py @@ -18,7 +18,7 @@ from contextvars import ContextVar from pathlib import Path -from . import blobs, config, loader, models, serialisation, server, types, utils +from . import blobs, config, loader, logs, models, serialisation, server, types, utils _EXECUTION_THRESHOLD_S = 1.0 _WORKER_THRESHOLD_S = 5.0 @@ -98,7 +98,7 @@ class SubmitExecutionRequest(t.NamedTuple): class ResolveReferenceRequest(t.NamedTuple): - execution_id: int + execution_id: str timeout: float | None @@ -108,7 +108,7 @@ class PersistAssetRequest(t.NamedTuple): class GetAssetRequest(t.NamedTuple): - asset_id: int + asset_id: str class SuspendRequest(t.NamedTuple): @@ -116,22 +116,15 @@ class SuspendRequest(t.NamedTuple): class CancelExecutionRequest(t.NamedTuple): - execution_id: int + execution_id: str class RegisterGroupRequest(t.NamedTuple): - execution_id: int + execution_id: str group_id: int name: str | None -class LogMessageRequest(t.NamedTuple): - level: int - template: str | None - values: dict[str, t.Any] - timestamp: int - - class RemoteException(Exception): def __init__(self, message, remote_type): super().__init__(message) @@ -253,22 +246,30 @@ def _prepare_asset_entries_for_path( class Channel: def __init__( self, - execution_id: int, + execution_id: str, serialiser_configs: list[config.SerialiserConfig], blob_threshold: int, blob_store_configs: list[config.BlobStoreConfig], + log_store_config: config.LogStoreConfig, server_host: str, secure: bool, connection, + run_id: str, + workspace_id: str, ): self._execution_id = execution_id + self._run_id = run_id + self._workspace_id = workspace_id + self._log_store_config = log_store_config + self._server_host = server_host + self._secure = secure self._connection = connection self._request_id = counter() self._requests: dict[int, Future[t.Any]] = {} self._groups: list[str | None] = [] self._running = True self._exit_stack = contextlib.ExitStack() - self._blob_manager = blobs.Manager(blob_store_configs, server_host, secure=secure) + self._blob_manager = blobs.Manager(blob_store_configs, server_host=server_host, server_secure=secure) self._serialisation_manager = serialisation.Manager( serialiser_configs, blob_threshold, self._blob_manager ) @@ -276,6 +277,10 @@ def __init__( def __enter__(self): self._directory = self._exit_stack.enter_context(working_directory()) self._exit_stack.enter_context(self._blob_manager) + self._log_store = logs.create_log_store( + self._log_store_config, server_host=self._server_host, server_secure=self._secure + ) + self._exit_stack.enter_context(self._log_store) return self def __exit__(self, exc_type, exc_value, traceback): @@ -344,7 +349,7 @@ def submit_execution( serialised_arguments = [ self._serialisation_manager.serialise(a) for a in arguments ] - execution_id = self._request( + execution_id, metadata = self._request( SubmitExecutionRequest( module, target, @@ -361,7 +366,7 @@ def submit_execution( requires, ) ) - return models.Execution(execution_id) + return models.Execution(execution_id, metadata) @contextmanager def group(self, name: str | None): @@ -434,11 +439,11 @@ def _deserialise_result(self, result: types.Result): case result: raise Exception(f"unexpected result ({result})") - def resolve_reference(self, execution_id: int) -> t.Any: + def resolve_reference(self, execution_id: str) -> t.Any: result = self._request(ResolveReferenceRequest(execution_id, _timeout.get())) return self._deserialise_result(result) - def cancel_execution(self, execution_id: int) -> None: + def cancel_execution(self, execution_id: str) -> None: # TODO: wait for confirmation? self._notify(CancelExecutionRequest(execution_id)) @@ -516,10 +521,10 @@ def create_asset( raise Exception(f"Unhandled entry type ({type(entry)})") else: raise Exception(f"Unhandled entries type ({type(entries)})") - asset_id = self._request(PersistAssetRequest(name, entries_)) - return models.Asset(asset_id) + asset_id, metadata = self._request(PersistAssetRequest(name, entries_)) + return models.Asset(asset_id, metadata) - def resolve_asset(self, asset_id: int) -> list[models.AssetEntry]: + def resolve_asset(self, asset_id: str) -> list[models.AssetEntry]: result = self._request(GetAssetRequest(asset_id)) return [ models.AssetEntry(path, blob_key, size, metadata) @@ -531,18 +536,18 @@ def download_blob(self, blob_key: str, path: Path): def log_message(self, level, template: str | None, **kwargs): timestamp = time.time() * 1000 - values = { key: self._serialisation_manager.serialise(value) for key, value in kwargs.items() } - self._notify( - LogMessageRequest( - level, - str(template) if template is not None else None, - values, - int(timestamp), - ) + self._log_store.write( + self._run_id, + self._execution_id, + self._workspace_id, + int(timestamp), + level, + str(template) if template is not None else None, + values, ) @@ -562,13 +567,16 @@ def _execute( module_name: str, target_name: str, arguments: list[types.Value], - execution_id: int, + execution_id: str, serialiser_configs: list[config.SerialiserConfig], blob_threshold: int, blob_store_configs: list[config.BlobStoreConfig], + log_store_config: config.LogStoreConfig, server_host: str, secure: bool, conn, + run_id: str, + workspace_id: str, ): global _channel_context module = loader.load_module(module_name) @@ -577,9 +585,12 @@ def _execute( serialiser_configs, blob_threshold, blob_store_configs, + log_store_config, server_host, secure, conn, + run_id, + workspace_id, ) as channel: threading.Thread(target=channel.run).start() _channel_context = channel @@ -595,14 +606,14 @@ def _execute( def _json_safe_reference(reference: types.Reference) -> t.Any: match reference: - case ("execution", execution_id): + case ("execution", execution_id, _metadata): return ["execution", execution_id] - case ("asset", asset_id): + case ("asset", asset_id, _metadata): return ["asset", asset_id] case ("fragment", serialiser, blob_key, size, metadata): return ["fragment", serialiser, blob_key, size, metadata] case other: - raise Exception(f"unhandled reference type ({other})") + raise ValueError(f"Unknown reference type: {other}") def _json_safe_references(references: list[types.Reference]) -> list[t.Any]: @@ -624,14 +635,27 @@ def _json_safe_arguments(arguments: list[types.Value]): def _parse_reference(reference) -> types.Reference: match reference: - case ["execution", execution_id]: - return ("execution", execution_id) - case ["asset", asset_id]: - return ("asset", asset_id) + case ["execution", execution_id, run_id, step_id, attempt, module, target]: + metadata = models.ExecutionMetadata( + run_id=run_id, + step_id=step_id, + attempt=attempt, + module=module, + target=target, + ) + return ("execution", execution_id, metadata) + case ["asset", external_id, name, total_count, total_size]: + metadata = models.AssetMetadata( + external_id=external_id, + name=name, + total_count=total_count, + total_size=total_size, + ) + return ("asset", external_id, metadata) case ["fragment", serialiser, blob_key, size, metadata]: return ("fragment", serialiser, blob_key, size, metadata) case other: - raise Exception(f"unrecognised reference: {other}") + raise ValueError(f"Unknown reference format: {other}") def _parse_references(references) -> list[types.Reference]: @@ -662,26 +686,64 @@ def _parse_result(result: t.Any) -> types.Result: raise Exception(f"unrecognised result: {other}") +def _parse_submit_result(result: t.Any) -> tuple[str, models.ExecutionMetadata]: + """Parse the submit response to extract execution ID and metadata.""" + match result: + case [execution_id, run_id, step_id, attempt, module, target]: + metadata = models.ExecutionMetadata( + run_id=run_id, + step_id=step_id, + attempt=attempt, + module=module, + target=target, + ) + return (execution_id, metadata) + case other: + raise Exception(f"unrecognised submit result: {other}") + + +def _parse_asset_result(result: t.Any) -> tuple[str, models.AssetMetadata]: + """Parse the put_asset response to extract asset ID and metadata.""" + match result: + case [asset_id, external_id, name, total_count, total_size]: + metadata = models.AssetMetadata( + external_id=external_id, + name=name, + total_count=total_count, + total_size=total_size, + ) + return (asset_id, metadata) + case other: + raise Exception(f"unrecognised asset result: {other}") + + class ExecutionState: def __init__( self, - execution_id: int, + execution_id: str, module: str, target: str, arguments: list[t.Any], serialiser_configs: list[config.SerialiserConfig], blob_threshold: int, blob_store_configs: list[config.BlobStoreConfig], + log_store_config: config.LogStoreConfig, server_host: str, secure: bool, server_connection: server.Connection, loop: asyncio.AbstractEventLoop, + run_id: str, + workspace_id: str, ): self._id = execution_id + self._run_id = run_id + self._server_host = server_host + self._secure = secure self._server = server_connection self._loop = loop self._timestamp = time.time() # TODO: better name self._status = ExecutionStatus.STARTING + mp_context = multiprocessing.get_context("spawn") parent_conn, child_conn = mp_context.Pipe() self._connection = parent_conn @@ -695,9 +757,12 @@ def __init__( serialiser_configs, blob_threshold, blob_store_configs, + log_store_config, server_host, secure, child_conn, + run_id, + workspace_id, ), name=f"Execution-{execution_id}", ) @@ -794,10 +859,6 @@ def _handle_notify(self, message): self._server_notify("cancel", (execution_id,)) case RegisterGroupRequest(execution_id, group_id, name): self._server_notify("register_group", (execution_id, group_id, name)) - case LogMessageRequest(level, template, values, timestamp): - self._server_notify( - "log_messages", ([self._id, timestamp, level, template, values],) - ) case other: raise Exception(f"Received unhandled notify: {other!r}") @@ -837,6 +898,7 @@ def _handle_request(self, request_id, request): requires, ), request_id, + _parse_submit_result, ) case ResolveReferenceRequest(execution_id, timeout): # TODO: set (and unset) state on Execution to indicate waiting? @@ -848,7 +910,7 @@ def _handle_request(self, request_id, request): _parse_result, ) case PersistAssetRequest(name, entries): - self._server_request("put_asset", (self._id, name, entries), request_id) + self._server_request("put_asset", (self._id, name, entries), request_id, _parse_asset_result) case GetAssetRequest(asset_id): self._server_request("get_asset", (asset_id, self._id), request_id) case other: @@ -889,12 +951,14 @@ def __init__( serialiser_configs: list[config.SerialiserConfig], blob_threshold: int, blob_store_configs: list[config.BlobStoreConfig], + log_store_config: config.LogStoreConfig, ): self._connection = connection self._serialiser_configs = serialiser_configs self._blob_threshold = blob_threshold self._blob_store_configs = blob_store_configs - self._executions: dict[int, ExecutionState] = {} + self._log_store_config = log_store_config + self._executions: dict[str, ExecutionState] = {} self._last_heartbeat_sent = None async def _declare_targets( @@ -931,13 +995,15 @@ async def run(self, targets: dict[str, dict[types.TargetType, list[str]]]) -> No def execute( self, - execution_id: int, + execution_id: str, module: str, target: str, arguments: list[types.Value], server_host: str, secure: bool, loop: asyncio.AbstractEventLoop, + run_id: str, + workspace_id: str, ) -> None: if execution_id in self._executions: raise Exception(f"Execution ({execution_id}) already running") @@ -949,10 +1015,13 @@ def execute( self._serialiser_configs, self._blob_threshold, self._blob_store_configs, + self._log_store_config, server_host, secure, self._connection, loop, + run_id, + workspace_id, ) threading.Thread( target=self._run_execution, @@ -971,7 +1040,7 @@ def _run_execution( future = asyncio.run_coroutine_threadsafe(coro, loop) future.result() - def abort(self, execution_id: int, timeout: int = 5) -> bool: + def abort(self, execution_id: str, timeout: int = 5) -> bool: execution = self._executions.get(execution_id) if not execution: return False diff --git a/clients/python/coflux/logs.py b/clients/python/coflux/logs.py new file mode 100644 index 00000000..46097df8 --- /dev/null +++ b/clients/python/coflux/logs.py @@ -0,0 +1,260 @@ +"""Log store abstraction for writing logs to external storage. + +This module provides a pluggable interface for storing log messages, +with an HTTP implementation that POSTs logs to the Coflux server's +/logs endpoint. +""" + +import abc +import threading +import time +import typing as t +from dataclasses import dataclass +from queue import Empty, Queue + +import httpx + +from . import config + + +@dataclass +class LogEntry: + """A log entry to be written to the log store.""" + + run_id: str + execution_id: str + workspace_id: str + timestamp: int + level: int + template: str | None + values: dict[str, t.Any] + + +def _format_values(values: dict[str, t.Any]) -> dict[str, t.Any]: + """Format serialized values for HTTP log posting.""" + return {key: _format_value(value) for key, value in values.items()} + + +def _format_value(value: t.Any) -> dict[str, t.Any]: + """Format a single serialized value for HTTP log posting.""" + match value: + case ("raw", data, references): + return { + "type": "raw", + "data": data, + "references": [_format_reference(r) for r in references], + } + case ("blob", key, size, references): + return { + "type": "blob", + "key": key, + "size": size, + "references": [_format_reference(r) for r in references], + } + case _: + return {"type": "raw", "data": value, "references": []} + + +def _format_reference(ref: t.Any) -> dict[str, t.Any]: + """Format a reference for HTTP log posting, including metadata.""" + match ref: + case ("execution", execution_id, metadata): + return { + "type": "execution", + "executionId": execution_id, + "runId": metadata.run_id, + "stepId": metadata.step_id, + "attempt": metadata.attempt, + "module": metadata.module, + "target": metadata.target, + } + case ("asset", asset_id, metadata): + return { + "type": "asset", + "assetId": asset_id, + "externalId": metadata.external_id, + "name": metadata.name, + "totalCount": metadata.total_count, + "totalSize": metadata.total_size, + } + case ("fragment", format_, blob_key, size, metadata): + return { + "type": "fragment", + "format": format_, + "blobKey": blob_key, + "size": size, + "metadata": metadata, + } + case _: + raise ValueError(f"Unknown reference type: {ref}") + + +class LogStore(abc.ABC): + """Abstract base class for log stores.""" + + @abc.abstractmethod + def write( + self, + run_id: str, + execution_id: str, + workspace_id: str, + timestamp: int, + level: int, + template: str | None, + values: dict[str, t.Any], + ) -> None: + """Write a log message to the store.""" + ... + + def __enter__(self) -> "LogStore": + return self + + def __exit__(self, *_) -> None: + pass + + +class HttpLogStore(LogStore): + """HTTP-based log store that POSTs logs to the Coflux server. + + Buffers logs and sends them in batches for efficiency. + """ + + def __init__( + self, + base_url: str, + *, + batch_size: int = 100, + flush_interval: float = 0.5, + secure: bool = False, + ): + """Initialize the HTTP log store. + + Args: + base_url: The base URL of the Coflux server (e.g., "localhost:7777") + batch_size: Maximum number of logs to batch before sending + flush_interval: Maximum time (seconds) between flushes + secure: Whether to use HTTPS + """ + protocol = "https" if secure else "http" + self._url = f"{protocol}://{base_url}/logs" + self._batch_size = batch_size + self._flush_interval = flush_interval + + self._queue: Queue[LogEntry] = Queue() + self._running = False + self._thread: threading.Thread | None = None + + def write( + self, + run_id: str, + execution_id: str, + workspace_id: str, + timestamp: int, + level: int, + template: str | None, + values: dict[str, t.Any], + ) -> None: + """Queue a log message for batched sending.""" + entry = LogEntry(run_id, execution_id, workspace_id, timestamp, level, template, values) + self._queue.put(entry) + + def __enter__(self) -> "HttpLogStore": + self._running = True + self._thread = threading.Thread(target=self._run, daemon=True) + self._thread.start() + return self + + def __exit__(self, *_) -> None: + self._running = False + if self._thread: + self._thread.join(timeout=5.0) + self._flush_all() + + def _run(self) -> None: + """Background thread that periodically flushes the queue.""" + last_flush = time.time() + buffer: list[LogEntry] = [] + + while self._running: + try: + entry = self._queue.get(timeout=0.1) + buffer.append(entry) + + # Flush if buffer is full or interval elapsed + if len(buffer) >= self._batch_size or ( + time.time() - last_flush > self._flush_interval and buffer + ): + self._flush(buffer) + buffer = [] + last_flush = time.time() + + except Empty: + # Check if we should flush on interval + if buffer and time.time() - last_flush > self._flush_interval: + self._flush(buffer) + buffer = [] + last_flush = time.time() + + # Flush remaining on stop + if buffer: + self._flush(buffer) + + def _flush_all(self) -> None: + """Flush all remaining items in the queue.""" + buffer: list[LogEntry] = [] + while True: + try: + entry = self._queue.get_nowait() + buffer.append(entry) + except Empty: + break + if buffer: + self._flush(buffer) + + def _flush(self, entries: list[LogEntry]) -> None: + """Send buffered logs to the server.""" + if not entries: + return + + try: + payload = { + "messages": [self._format_entry(e) for e in entries], + } + response = httpx.post(self._url, json=payload, timeout=10) + response.raise_for_status() + except httpx.HTTPError as e: + # Log error but don't crash - logs are best-effort + print(f"Failed to send logs: {e}") + + def _format_entry(self, entry: LogEntry) -> dict[str, t.Any]: + """Format a log entry for the API.""" + return { + "runId": entry.run_id, + "executionId": entry.execution_id, + "workspaceId": entry.workspace_id, + "timestamp": entry.timestamp, + "level": entry.level, + "template": entry.template, + "values": _format_values(entry.values), + } + + +def create_log_store( + store_config: config.LogStoreConfig, + *, + server_host: str, + server_secure: bool = False, +) -> LogStore: + """Create a log store from configuration.""" + match store_config: + case config.HTTPLogStoreConfig(): + host = store_config.host if store_config.host is not None else server_host + secure = store_config.secure if store_config.secure is not None else server_secure + return HttpLogStore( + host, + batch_size=store_config.batch_size, + flush_interval=store_config.flush_interval, + secure=secure, + ) + case _: + raise ValueError(f"Unknown log store config type: {store_config}") diff --git a/clients/python/coflux/models.py b/clients/python/coflux/models.py index 878eb125..8b97cfa9 100644 --- a/clients/python/coflux/models.py +++ b/clients/python/coflux/models.py @@ -52,15 +52,48 @@ def _get_channel(): return execution.get_channel() -# TODO: make non-tuple? -class Execution[T](t.NamedTuple): - id: int +class ExecutionMetadata(t.NamedTuple): + """Metadata for an execution reference, used in log serialization.""" + + run_id: str | None = None + step_id: str | None = None + attempt: int | None = None + module: str | None = None + target: str | None = None + + +class Execution[T]: + """Reference to a step execution.""" + + def __init__( + self, + id: str, + metadata: ExecutionMetadata | None = None, + ): + self._id = id + self._metadata = metadata or ExecutionMetadata() + + @property + def id(self) -> str: + return self._id + + @property + def metadata(self) -> ExecutionMetadata: + return self._metadata def result(self) -> T: - return _get_channel().resolve_reference(self.id) + return _get_channel().resolve_reference(self._id) def cancel(self) -> None: - _get_channel().cancel_execution(self.id) + _get_channel().cancel_execution(self._id) + + def __eq__(self, other: object) -> bool: + if isinstance(other, Execution): + return self._id == other._id + return False + + def __hash__(self) -> int: + return hash(self._id) # TODO: (make non-tuple?) @@ -78,14 +111,32 @@ def restore(self, *, at: Path | str | None = None) -> Path: return target +class AssetMetadata(t.NamedTuple): + """Metadata for an asset reference, used in log serialization.""" + + external_id: str | None = None + name: str | None = None + total_count: int | None = None + total_size: int | None = None + + class Asset: - def __init__(self, id: int): + def __init__( + self, + id: str, + metadata: AssetMetadata | None = None, + ): self._id = id + self._metadata = metadata or AssetMetadata() @property - def id(self): + def id(self) -> str: return self._id + @property + def metadata(self) -> AssetMetadata: + return self._metadata + @functools.cached_property def entries(self) -> list[AssetEntry]: return _get_channel().resolve_asset(self._id) diff --git a/clients/python/coflux/serialisation.py b/clients/python/coflux/serialisation.py index ed393310..76d44a5e 100644 --- a/clients/python/coflux/serialisation.py +++ b/clients/python/coflux/serialisation.py @@ -158,10 +158,12 @@ def _serialise(value: t.Any) -> t.Any: elif isinstance(value, models.Execution): # TODO: better handle id being none assert value.id is not None - references.append(("execution", value.id)) + # Include metadata in reference for log storage + references.append(("execution", value.id, value.metadata)) return {"type": "ref", "index": len(references) - 1} elif isinstance(value, models.Asset): - references.append(("asset", value.id)) + # Include metadata in reference for log storage + references.append(("asset", value.id, value.metadata)) return {"type": "ref", "index": len(references) - 1} elif isinstance(value, tuple): # TODO: include name @@ -219,10 +221,10 @@ def _deserialise(data: t.Any): case "ref": reference = references[data["index"]] match reference: - case ("execution", execution_id): - return models.Execution(execution_id) - case ("asset", asset_id): - return models.Asset(asset_id) + case ("execution", execution_id, exec_metadata): + return models.Execution(execution_id, exec_metadata) + case ("asset", asset_id, asset_metadata): + return models.Asset(asset_id, asset_metadata) case ("fragment", format, blob_key, _size, metadata): data = self._blob_manager.get(blob_key) for serialiser in self._serialisers: @@ -234,6 +236,8 @@ def _deserialise(data: t.Any): raise Exception( f"Couldn't deserialise fragment ({format})" ) + case other: + raise ValueError(f"Unknown reference type: {other}") case other: raise Exception(f"unhandled data type ({other})") else: diff --git a/clients/python/coflux/types.py b/clients/python/coflux/types.py index 68c15596..0877c4d8 100644 --- a/clients/python/coflux/types.py +++ b/clients/python/coflux/types.py @@ -1,5 +1,8 @@ import typing as t +if t.TYPE_CHECKING: + from . import models + TargetType = t.Literal["workflow", "task"] Requires = dict[str, list[str]] @@ -7,8 +10,8 @@ Metadata = dict[str, t.Any] Reference = ( - tuple[t.Literal["execution"], int] - | tuple[t.Literal["asset"], int] + tuple[t.Literal["execution"], str, "models.ExecutionMetadata"] + | tuple[t.Literal["asset"], str, "models.AssetMetadata"] | tuple[t.Literal["fragment"], str, str, int, dict[str, t.Any]] ) diff --git a/clients/python/coflux/worker.py b/clients/python/coflux/worker.py index 38c9f834..3eb294c1 100644 --- a/clients/python/coflux/worker.py +++ b/clients/python/coflux/worker.py @@ -13,14 +13,27 @@ def _parse_reference(reference: t.Any) -> types.Reference: match reference: - case ["execution", execution_id]: - return ("execution", execution_id) - case ["asset", asset_id]: - return ("asset", asset_id) + case ["execution", execution_id, run_id, step_id, attempt, module, target]: + metadata = models.ExecutionMetadata( + run_id=run_id, + step_id=step_id, + attempt=attempt, + module=module, + target=target, + ) + return ("execution", execution_id, metadata) + case ["asset", external_id, name, total_count, total_size]: + metadata = models.AssetMetadata( + external_id=external_id, + name=name, + total_count=total_count, + total_size=total_size, + ) + return ("asset", external_id, metadata) case ["fragment", serialiser, blob_key, size, metadata]: return ("fragment", serialiser, blob_key, size, metadata) case other: - raise Exception(f"unexpected reference: {other}") + raise ValueError(f"Unknown reference format: {other}") def _parse_references(references: list[t.Any]) -> list[types.Reference]: @@ -50,6 +63,7 @@ def __init__( serialiser_configs: list[config.SerialiserConfig], blob_threshold: int, blob_store_configs: list[config.BlobStoreConfig], + log_store_config: config.LogStoreConfig, session_id: str, targets: dict[str, dict[str, tuple[models.Target, t.Callable]]], ): @@ -62,7 +76,7 @@ def __init__( {"execute": self._handle_execute, "abort": self._handle_abort} ) self._execution_manager = execution.Manager( - self._connection, serialiser_configs, blob_threshold, blob_store_configs + self._connection, serialiser_configs, blob_threshold, blob_store_configs, log_store_config ) def __enter__(self): @@ -72,13 +86,14 @@ def __exit__(self, exc_type, exc_value, traceback): self._execution_manager.abort_all() async def _handle_execute(self, *args) -> None: - (execution_id, module_name, target_name, arguments) = args + (execution_id, module_name, target_name, arguments, run_id, workspace_id) = args print(f"Handling execute '{target_name}' ({execution_id})...") target = self._targets[module_name][target_name][1].__name__ arguments = [_parse_value(a) for a in arguments] loop = asyncio.get_running_loop() self._execution_manager.execute( - execution_id, module_name, target, arguments, self._server_host, self._secure, loop + execution_id, module_name, target, arguments, self._server_host, self._secure, loop, + run_id, workspace_id ) async def _handle_abort(self, *args) -> None: diff --git a/server/lib/coflux/application.ex b/server/lib/coflux/application.ex index 7d1a498c..12a160ec 100644 --- a/server/lib/coflux/application.ex +++ b/server/lib/coflux/application.ex @@ -1,7 +1,7 @@ defmodule Coflux.Application do use Application - alias Coflux.{Config, ProjectStore, Orchestration, Topics} + alias Coflux.{Config, ProjectStore, Orchestration, Logs, Topics} @impl true def start(_type, _args) do @@ -15,6 +15,8 @@ defmodule Coflux.Application do # TODO: separate launch supervisor per project? (and specify max_children?) {Task.Supervisor, name: Coflux.LauncherSupervisor}, Orchestration.Supervisor, + {Registry, keys: :unique, name: Coflux.Logs.Registry}, + Logs.Supervisor, {Topical, name: Coflux.TopicalRegistry, topics: topics()}, {Coflux.Web, port: port} ] @@ -34,7 +36,6 @@ defmodule Coflux.Application do Topics.Modules, Topics.Run, Topics.Workflow, - Topics.Logs, Topics.Module, Topics.Pools, Topics.Pool, diff --git a/server/lib/coflux/handlers/logs.ex b/server/lib/coflux/handlers/logs.ex new file mode 100644 index 00000000..8a0c69a0 --- /dev/null +++ b/server/lib/coflux/handlers/logs.ex @@ -0,0 +1,287 @@ +defmodule Coflux.Handlers.Logs do + @moduledoc """ + HTTP handler for the logs endpoint. + + - POST /logs - Accept batched log messages + - GET /logs?run_id=X - Query logs (JSON) or subscribe (SSE) + + Project is resolved from the request (COFLUX_PROJECT env or subdomain). + SSE is enabled when Accept: text/event-stream header is present. + """ + + import Coflux.Handlers.Utils + + alias Coflux.Logs + + def init(req, opts) do + req = set_cors_headers(req) + + case :cowboy_req.method(req) do + "OPTIONS" -> + req = :cowboy_req.reply(204, req) + {:ok, req, opts} + + "POST" -> + handle_post(req, opts) + + "GET" -> + handle_get(req, opts) + + _ -> + req = json_error_response(req, "method_not_allowed", status: 405) + {:ok, req, opts} + end + end + + ## POST /logs - Write log entries + + defp handle_post(req, opts) do + case resolve_project(req) do + {:ok, project_id} -> + handle_post_with_project(req, opts, project_id) + + {:error, :not_configured} -> + req = json_error_response(req, "not_configured", status: 500) + {:ok, req, opts} + + {:error, :project_required} -> + req = json_error_response(req, "project_required", status: 400) + {:ok, req, opts} + + {:error, :invalid_host} -> + req = json_error_response(req, "invalid_host", status: 400) + {:ok, req, opts} + end + end + + defp handle_post_with_project(req, opts, project_id) do + case read_json_body(req) do + {:ok, body, req} -> + with {:ok, messages} <- get_required(body, "messages"), + {:ok, entries} <- parse_messages(messages) do + # Route to per-project logs server (async, fire-and-forget) + :ok = Logs.Server.write_logs(project_id, entries) + req = json_response(req, 200, %{"ok" => true}) + {:ok, req, opts} + else + {:error, field, reason} -> + req = json_error_response(req, "invalid_request", details: %{"field" => field, "reason" => reason}) + {:ok, req, opts} + end + + {:error, :invalid_json} -> + req = json_error_response(req, "invalid_json") + {:ok, req, opts} + end + end + + ## GET /logs - Query or subscribe to logs + + defp handle_get(req, opts) do + case resolve_project(req) do + {:ok, project_id} -> + handle_get_with_project(req, opts, project_id) + + {:error, :not_configured} -> + req = json_error_response(req, "not_configured", status: 500) + {:ok, req, opts} + + {:error, :project_required} -> + req = json_error_response(req, "project_required", status: 400) + {:ok, req, opts} + + {:error, :invalid_host} -> + req = json_error_response(req, "invalid_host", status: 400) + {:ok, req, opts} + end + end + + defp handle_get_with_project(req, opts, project_id) do + qs = :cowboy_req.parse_qs(req) + accept = :cowboy_req.header("accept", req, "application/json") + + run_id = get_query_param(qs, "run") + execution_id = get_query_param(qs, "execution", &String.to_integer/1) + workspace_ids = parse_id_list(get_query_param(qs, "workspaces")) + after_cursor = get_query_param(qs, "after") + + cond do + is_nil(run_id) -> + req = json_error_response(req, "run_required") + {:ok, req, opts} + + String.contains?(accept, "text/event-stream") -> + # SSE mode - subscribe and stream + handle_sse(req, opts, project_id, run_id, execution_id, workspace_ids) + + true -> + # JSON mode - query and return + query_opts = [ + run_id: run_id, + execution_id: execution_id, + workspace_ids: workspace_ids, + after: after_cursor + ] + + case Logs.Server.query_logs(project_id, query_opts) do + {:ok, entries, cursor} -> + result = %{ + "logs" => Enum.map(entries, &format_entry/1), + "cursor" => cursor + } + + req = json_response(req, 200, result) + {:ok, req, opts} + + {:error, reason} -> + req = json_error_response(req, "query_failed", details: %{"reason" => inspect(reason)}) + {:ok, req, opts} + end + end + end + + ## SSE Handling + + defp handle_sse(req, opts, project_id, run_id, execution_id, workspace_ids) do + subscribe_opts = + [] + |> then(fn o -> if execution_id, do: [{:execution_id, execution_id} | o], else: o end) + |> then(fn o -> if workspace_ids != [], do: [{:workspace_ids, workspace_ids} | o], else: o end) + + case Logs.Server.subscribe(project_id, run_id, self(), subscribe_opts) do + {:ok, ref, initial_entries} -> + headers = %{ + "content-type" => "text/event-stream", + "cache-control" => "no-cache", + "connection" => "keep-alive" + } + + req = :cowboy_req.stream_reply(200, headers, req) + send_sse_data(req, Enum.map(initial_entries, &format_entry/1)) + {:cowboy_loop, req, %{ref: ref, project_id: project_id}} + + {:error, _reason} -> + req = json_error_response(req, "subscription_failed", status: 500) + {:ok, req, opts} + end + end + + def info({:logs, ref, entries}, req, %{ref: ref} = state) do + if Enum.any?(entries) do + send_sse_data(req, Enum.map(entries, &format_entry/1)) + end + + {:ok, req, state} + end + + def info(_msg, req, state) do + {:ok, req, state} + end + + def terminate(_reason, _req, %{ref: ref, project_id: project_id}) do + Logs.Server.unsubscribe(project_id, ref) + :ok + end + + def terminate(_reason, _req, _state) do + :ok + end + + ## Helpers + + defp get_required(body, field) do + case Map.fetch(body, field) do + {:ok, value} when not is_nil(value) -> {:ok, value} + _ -> {:error, field, "required"} + end + end + + defp parse_messages(messages) when is_list(messages) do + Enum.reduce_while(messages, {:ok, []}, fn msg, {:ok, acc} -> + values = Map.get(msg, "values", %{}) + + if valid_values?(values) do + entry = %{ + run_id: Map.get(msg, "runId"), + execution_id: parse_integer(Map.get(msg, "executionId")), + workspace_id: parse_integer(Map.get(msg, "workspaceId")), + timestamp: Map.get(msg, "timestamp"), + level: Map.get(msg, "level"), + template: Map.get(msg, "template"), + values: values + } + + {:cont, {:ok, [entry | acc]}} + else + {:halt, {:error, "values", "invalid structure"}} + end + end) + |> case do + {:ok, entries} -> {:ok, Enum.reverse(entries)} + error -> error + end + end + + defp parse_messages(_), do: {:error, "messages", "must be an array"} + + defp parse_integer(value) when is_integer(value), do: value + defp parse_integer(value) when is_binary(value), do: String.to_integer(value) + defp parse_integer(nil), do: nil + + defp parse_id_list(nil), do: [] + defp parse_id_list(""), do: [] + defp parse_id_list(value) when is_binary(value) do + value + |> String.split(",") + |> Enum.map(&String.to_integer/1) + end + + # Validation functions - check structure but pass through as-is + + defp valid_values?(values) when is_map(values) do + Enum.all?(values, fn {_key, value} -> valid_value?(value) end) + end + + defp valid_values?(_), do: false + + defp valid_value?(%{"type" => "raw", "references" => refs}) when is_list(refs) do + Enum.all?(refs, &valid_reference?/1) + end + + defp valid_value?(%{"type" => "blob", "key" => key, "size" => size, "references" => refs}) + when is_binary(key) and is_integer(size) and is_list(refs) do + Enum.all?(refs, &valid_reference?/1) + end + + defp valid_value?(_), do: false + + defp valid_reference?(%{"type" => "execution", "executionId" => id}) + when is_binary(id) or is_integer(id), + do: true + + defp valid_reference?(%{"type" => "asset", "assetId" => id}) + when is_binary(id) or is_integer(id), + do: true + + defp valid_reference?(%{"type" => "fragment", "format" => f, "blobKey" => k, "size" => s}) + when is_binary(f) and is_binary(k) and is_integer(s), + do: true + + defp valid_reference?(_), do: false + + defp format_entry(entry) do + %{ + "executionId" => Integer.to_string(entry.execution_id), + "workspaceId" => Integer.to_string(entry.workspace_id), + "timestamp" => entry.timestamp, + "level" => entry.level, + "template" => entry.template, + "values" => entry.values + } + end + + defp send_sse_data(req, logs) do + message = "data: #{Jason.encode!(logs)}\n\n" + :cowboy_req.stream_body(message, :nofin, req) + end +end diff --git a/server/lib/coflux/handlers/utils.ex b/server/lib/coflux/handlers/utils.ex index eb7d6285..7d19c960 100644 --- a/server/lib/coflux/handlers/utils.ex +++ b/server/lib/coflux/handlers/utils.ex @@ -228,4 +228,13 @@ defmodule Coflux.Handlers.Utils do nil end end + + @doc """ + Get all values for a query parameter (for repeated params like ?workspace=a&workspace=b). + """ + def get_query_params(qs, key) do + qs + |> Enum.filter(fn {k, _v} -> k == key end) + |> Enum.map(fn {_k, v} -> v end) + end end diff --git a/server/lib/coflux/handlers/worker.ex b/server/lib/coflux/handlers/worker.ex index e5de6d24..9b88608a 100644 --- a/server/lib/coflux/handlers/worker.ex +++ b/server/lib/coflux/handlers/worker.ex @@ -109,6 +109,7 @@ defmodule Coflux.Handlers.Worker do "register_group" -> [parent_id, group_id, name] = message["params"] + parent_id = String.to_integer(parent_id) if is_recognised_execution?(parent_id, state) do case( @@ -142,6 +143,7 @@ defmodule Coflux.Handlers.Worker do recurrent, requires ] = message["params"] + parent_id = String.to_integer(parent_id) if is_recognised_execution?(parent_id, state) do case Orchestration.schedule_step( @@ -161,8 +163,18 @@ defmodule Coflux.Handlers.Worker do recurrent: recurrent == true, requires: requires ) do - {:ok, _run_id, _step_id, execution_id} -> - {[success_message(message["id"], execution_id)], state} + {:ok, _run_id, _step_id, execution_id, metadata} -> + # Return execution_id with metadata for log references + result = [ + Integer.to_string(execution_id), + metadata.run_id, + metadata.step_id, + metadata.attempt, + metadata.module, + metadata.target + ] + + {[success_message(message["id"], result)], state} {:error, error} -> {[error_message(message["id"], error)], state} @@ -190,6 +202,7 @@ defmodule Coflux.Handlers.Worker do "notify_terminated" -> [execution_ids] = message["params"] + execution_ids = Enum.map(execution_ids, &String.to_integer/1) # TODO: just ignore? if Enum.all?(execution_ids, &is_recognised_execution?(&1, state)) do @@ -206,6 +219,7 @@ defmodule Coflux.Handlers.Worker do "put_result" -> [execution_id, value] = message["params"] + execution_id = String.to_integer(execution_id) if is_recognised_execution?(execution_id, state) do :ok = @@ -222,6 +236,7 @@ defmodule Coflux.Handlers.Worker do "put_error" -> [execution_id, error] = message["params"] + execution_id = String.to_integer(execution_id) if is_recognised_execution?(execution_id, state) do {type, message, frames} = parse_error(error) @@ -240,6 +255,7 @@ defmodule Coflux.Handlers.Worker do "cancel" -> [execution_id] = message["params"] + execution_id = String.to_integer(execution_id) # TODO: restrict which executions can be cancelled? :ok = Orchestration.cancel_execution(state.project_id, execution_id) @@ -247,6 +263,7 @@ defmodule Coflux.Handlers.Worker do "suspend" -> [execution_id, execute_after] = message["params"] + execution_id = String.to_integer(execution_id) # TODO: validate execute_after if is_recognised_execution?(execution_id, state) do @@ -264,6 +281,8 @@ defmodule Coflux.Handlers.Worker do "get_result" -> [execution_id, from_execution_id, timeout_ms] = message["params"] + execution_id = String.to_integer(execution_id) + from_execution_id = String.to_integer(from_execution_id) if is_recognised_execution?(from_execution_id, state) do case Orchestration.get_result( @@ -285,6 +304,7 @@ defmodule Coflux.Handlers.Worker do "put_asset" -> [execution_id, name, entries] = message["params"] + execution_id = String.to_integer(execution_id) # TODO: validate @@ -294,16 +314,26 @@ defmodule Coflux.Handlers.Worker do {path, blob_key, size, metadata} end) - {:ok, asset_id} = + {:ok, asset_id, metadata} = Orchestration.put_asset(state.project_id, execution_id, name, entries) - {[success_message(message["id"], asset_id)], state} + # Return asset_id with metadata for log references + result = [ + Integer.to_string(asset_id), + metadata.external_id, + metadata.name, + metadata.total_count, + metadata.total_size + ] + + {[success_message(message["id"], result)], state} else {[{:close, 4000, "execution_invalid"}], nil} end "get_asset" -> [asset_id, from_execution_id] = message["params"] + from_execution_id = String.to_integer(from_execution_id) if is_recognised_execution?(from_execution_id, state) do case Orchestration.get_asset_entries(state.project_id, asset_id, from_execution_id) do @@ -322,30 +352,6 @@ defmodule Coflux.Handlers.Worker do {[{:close, 4000, "execution_invalid"}], nil} end - "log_messages" -> - messages = - Enum.reduce( - message["params"], - %{}, - fn [execution_id, timestamp, level, template, values], acc -> - values = Map.new(values, fn {k, v} -> {k, parse_value(v)} end) - message = {timestamp, parse_level(level), template, values} - - acc - |> Map.put_new(execution_id, []) - |> Map.update!(execution_id, &[message | &1]) - end - ) - - if Enum.all?(Map.keys(messages), &is_recognised_execution?(&1, state)) do - Enum.each(messages, fn {execution_id, messages} -> - Orchestration.record_logs(state.project_id, execution_id, Enum.reverse(messages)) - end) - - {[], state} - else - {[{:close, 4000, "execution_invalid"}], nil} - end end end @@ -353,10 +359,10 @@ defmodule Coflux.Handlers.Worker do {[], state} end - def websocket_info({:execute, execution_id, module, target, arguments}, state) do + def websocket_info({:execute, execution_id, module, target, arguments, run_id, workspace_id}, state) do arguments = Enum.map(arguments, &compose_value/1) state = Map.update!(state, :execution_ids, &MapSet.put(&1, execution_id)) - {[command_message("execute", [execution_id, module, target, arguments])], state} + {[command_message("execute", [Integer.to_string(execution_id), module, target, arguments, run_id, Integer.to_string(workspace_id)])], state} end def websocket_info({:result, request_id, result}, state) do @@ -364,7 +370,7 @@ defmodule Coflux.Handlers.Worker do end def websocket_info({:abort, execution_id}, state) do - {[command_message("abort", [execution_id])], state} + {[command_message("abort", [Integer.to_string(execution_id)])], state} end def websocket_info(:stop, state) do @@ -375,6 +381,7 @@ defmodule Coflux.Handlers.Worker do MapSet.member?(state.execution_ids, execution_id) end + defp session_message(session_id) do {:text, Jason.encode!([0, session_id])} end @@ -481,11 +488,12 @@ defmodule Coflux.Handlers.Worker do {:fragment, format, blob_key, size, metadata} -> ["fragment", format, blob_key, size, metadata] - {:execution, execution_id} -> - ["execution", execution_id] + {:execution, execution_id, metadata} -> + ["execution", Integer.to_string(execution_id), metadata.run_id, metadata.step_id, metadata.attempt, + metadata.module, metadata.target] - {:asset, asset_id} -> - ["asset", asset_id] + {:asset, external_id, {name, total_count, total_size, _entry}} -> + ["asset", external_id, name, total_count, total_size] end) end @@ -510,17 +518,6 @@ defmodule Coflux.Handlers.Worker do end end - defp parse_level(level) do - case level do - 0 -> :debug - 1 -> :stdout - 2 -> :info - 3 -> :stderr - 4 -> :warning - 5 -> :error - end - end - defp parse_websocket_protocols(req) do case :cowboy_req.parse_header("sec-websocket-protocol", req) do :undefined -> [] diff --git a/server/lib/coflux/logs/server.ex b/server/lib/coflux/logs/server.ex new file mode 100644 index 00000000..d162dc10 --- /dev/null +++ b/server/lib/coflux/logs/server.ex @@ -0,0 +1,299 @@ +defmodule Coflux.Logs.Server do + @moduledoc """ + Per-project GenServer for log storage with write buffering. + + Features: + - Buffers writes in memory for batch inserts + - Flushes on interval or when buffer is full + - Pub/sub for real-time log streaming + - Template deduplication caching + """ + + use GenServer + + require Logger + + alias Coflux.Logs.Store + + @flush_interval_ms 500 + @max_buffer_size 1000 + + defmodule State do + @moduledoc false + defstruct [ + :project_id, + :db, + :flush_timer, + buffer: [], + # run_id -> %{ref -> {pid, execution_id, workspace_ids}} + subscribers: %{}, + # template_hash -> template_id + template_cache: %{} + ] + end + + ## Client API + + def start_link(opts) do + project_id = Keyword.fetch!(opts, :project_id) + + GenServer.start_link(__MODULE__, project_id, + name: {:via, Registry, {Coflux.Logs.Registry, project_id}} + ) + end + + @doc """ + Write a batch of log entries. + + Each entry should be a map with: + - run_id: string + - execution_id: integer + - workspace_id: integer + - timestamp: integer (unix ms) + - level: integer (0-5) + - template: string or nil + - values: map + """ + def write_logs(project_id, entries) when is_list(entries) do + {:ok, server} = Coflux.Logs.Supervisor.get_server(project_id) + GenServer.cast(server, {:write_logs, entries}) + end + + @doc """ + Query logs for a run. + + Options: + - :run_id (required) + - :execution_id - filter by execution + - :workspace_ids - list of workspace IDs to include + - :after - cursor for pagination + - :limit - max results + """ + def query_logs(project_id, opts) do + {:ok, server} = Coflux.Logs.Supervisor.get_server(project_id) + GenServer.call(server, {:query_logs, opts}) + end + + @doc """ + Subscribe to real-time log updates for a run. + + Options: + - :execution_id - filter to only logs from this execution + - :workspace_ids - list of workspace IDs to include + + Returns {:ok, ref, initial_logs} where ref is used to unsubscribe. + The subscriber will receive {:logs, ref, entries} messages. + """ + def subscribe(project_id, run_id, pid, opts \\ []) do + {:ok, server} = Coflux.Logs.Supervisor.get_server(project_id) + GenServer.call(server, {:subscribe, run_id, pid, opts}) + end + + @doc """ + Unsubscribe from log updates. + """ + def unsubscribe(project_id, ref) do + case Coflux.Logs.Supervisor.get_server(project_id) do + {:ok, server} -> GenServer.cast(server, {:unsubscribe, ref}) + _ -> :ok + end + end + + ## Server Callbacks + + @impl true + def init(project_id) do + case Store.open(project_id) do + {:ok, db} -> + {:ok, %State{project_id: project_id, db: db}} + + {:error, reason} -> + {:stop, reason} + end + end + + @impl true + def handle_cast({:write_logs, entries}, state) do + state = %{state | buffer: state.buffer ++ entries} + state = maybe_flush(state) + state = schedule_flush(state) + {:noreply, state} + end + + @impl true + def handle_cast({:unsubscribe, ref}, state) do + state = do_unsubscribe(state, ref) + {:noreply, state} + end + + @impl true + def handle_call({:query_logs, opts}, _from, state) do + # Flush buffer first to include recent writes + state = flush_buffer(state) + result = Store.query_logs(state.db, opts) + {:reply, result, state} + end + + @impl true + def handle_call({:subscribe, run_id, pid, opts}, _from, state) do + ref = make_ref() + execution_id = Keyword.get(opts, :execution_id) + workspace_ids = Keyword.get(opts, :workspace_ids) + + # Monitor the subscriber + Process.monitor(pid) + + # Flush buffer first + state = flush_buffer(state) + + {:ok, initial_logs, _cursor} = + Store.query_logs(state.db, run_id: run_id, execution_id: execution_id, workspace_ids: workspace_ids) + + # Add to subscribers with optional filters + # Store as {pid, execution_id, workspace_ids} + state = + state + |> update_in( + [Access.key(:subscribers), Access.key(run_id, %{})], + &Map.put(&1, ref, {pid, execution_id, workspace_ids}) + ) + + {:reply, {:ok, ref, initial_logs}, state} + end + + @impl true + def handle_info(:flush, state) do + state = flush_buffer(state) + {:noreply, %{state | flush_timer: nil}} + end + + @impl true + def handle_info({:DOWN, _monitor_ref, :process, pid, _reason}, state) do + # Find and remove all subscriptions for this pid + state = remove_subscriber_by_pid(state, pid) + {:noreply, state} + end + + @impl true + def terminate(_reason, state) do + if state.db do + Store.close(state.db) + end + + :ok + end + + ## Private Functions + + defp flush_buffer(%{buffer: []} = state), do: state + + defp flush_buffer(state) do + case Store.insert_logs(state.db, state.buffer, state.template_cache) do + {:ok, template_cache} -> + notify_subscribers(state, state.buffer) + %{state | buffer: [], template_cache: template_cache} + + {:error, reason} -> + Logger.warning("Failed to flush logs: #{inspect(reason)}") + %{state | buffer: []} + end + end + + defp maybe_flush(state) when length(state.buffer) >= @max_buffer_size do + flush_buffer(state) + end + + defp maybe_flush(state), do: state + + defp schedule_flush(%{flush_timer: nil} = state) do + timer = Process.send_after(self(), :flush, @flush_interval_ms) + %{state | flush_timer: timer} + end + + defp schedule_flush(state), do: state + + defp notify_subscribers(state, entries) do + # Group entries by run_id + entries_by_run = Enum.group_by(entries, & &1.run_id) + + Enum.each(entries_by_run, fn {run_id, run_entries} -> + case Map.get(state.subscribers, run_id) do + nil -> + :ok + + subs -> + Enum.each(subs, fn {ref, {pid, filter_execution_id, filter_workspace_ids}} -> + filtered_entries = + run_entries + |> maybe_filter_by_execution_id(filter_execution_id) + |> maybe_filter_by_workspace_ids(filter_workspace_ids) + + if Enum.any?(filtered_entries) do + formatted_entries = Enum.map(filtered_entries, &format_entry_for_notification/1) + send(pid, {:logs, ref, formatted_entries}) + end + end) + end + end) + end + + defp maybe_filter_by_execution_id(entries, nil), do: entries + defp maybe_filter_by_execution_id(entries, execution_id) do + Enum.filter(entries, &(&1.execution_id == execution_id)) + end + + defp maybe_filter_by_workspace_ids(entries, nil), do: entries + defp maybe_filter_by_workspace_ids(entries, []), do: entries + defp maybe_filter_by_workspace_ids(entries, workspace_ids) do + Enum.filter(entries, &(&1.workspace_id in workspace_ids)) + end + + defp format_entry_for_notification(entry) do + Map.take(entry, [:execution_id, :workspace_id, :timestamp, :level, :template, :values]) + end + + defp do_unsubscribe(state, ref) do + # Find which run_id this ref belongs to and remove it + {state, _} = + Enum.reduce(state.subscribers, {state, false}, fn + {run_id, subs}, {state, false} -> + if Map.has_key?(subs, ref) do + new_subs = Map.delete(subs, ref) + + state = + if map_size(new_subs) == 0 do + update_in(state, [:subscribers], &Map.delete(&1, run_id)) + else + put_in(state, [:subscribers, run_id], new_subs) + end + + {state, true} + else + {state, false} + end + + _, acc -> + acc + end) + + state + end + + defp remove_subscriber_by_pid(state, pid) do + # Find and remove all subscriptions for this pid + new_subscribers = + state.subscribers + |> Enum.map(fn {run_id, subs} -> + new_subs = + subs + |> Enum.reject(fn {_ref, {sub_pid, _execution_id, _workspace_ids}} -> sub_pid == pid end) + |> Map.new() + + {run_id, new_subs} + end) + |> Enum.reject(fn {_run_id, subs} -> map_size(subs) == 0 end) + |> Map.new() + + %{state | subscribers: new_subscribers} + end +end diff --git a/server/lib/coflux/logs/store.ex b/server/lib/coflux/logs/store.ex new file mode 100644 index 00000000..f99a977f --- /dev/null +++ b/server/lib/coflux/logs/store.ex @@ -0,0 +1,237 @@ +defmodule Coflux.Logs.Store do + @moduledoc """ + SQLite database operations for logs storage. + + Provides functions for: + - Opening/creating logs databases + - Template deduplication (for space efficiency) + - Batch insert of log entries + - Querying logs with filters + """ + + alias Coflux.Store + + @doc """ + Opens or creates a logs database for the given project. + """ + def open(project_id) do + Store.open(project_id, "logs") + end + + @doc """ + Close the database connection. + """ + def close(db) do + Store.close(db) + end + + @doc """ + Get or create a template, returning its ID. + + Uses SHA256 hash for fast lookup and deduplication. + """ + def get_or_create_template(db, template) when is_binary(template) do + hash = :crypto.hash(:sha256, template) + + # Try to find existing template + case Store.query_one(db, "SELECT id FROM templates WHERE hash = ?1", {{:blob, hash}}) do + {:ok, {id}} -> + {:ok, id} + + {:ok, nil} -> + # Insert new template + Store.insert_one(db, "templates", hash: {:blob, hash}, template: template) + end + end + + def get_or_create_template(_db, nil), do: {:ok, nil} + + @doc """ + Insert a batch of log entries. + + Each entry should be a map with: + - run_id: string + - execution_id: integer + - workspace_id: integer + - timestamp: integer (unix ms) + - level: integer (0-5) + - template: string or nil + - values: map of values (will be JSON encoded) + + Returns {:ok, template_cache} on success. + """ + def insert_logs(db, entries, template_cache) when is_list(entries) do + now = System.system_time(:millisecond) + + # Process entries and update template cache + {rows, template_cache} = + Enum.map_reduce(entries, template_cache, fn entry, t_cache -> + {template_id, t_cache} = resolve_template(db, entry.template, t_cache) + + # Values are already validated JSON maps - store directly + values_json = + if entry.values && map_size(entry.values) > 0 do + Jason.encode!(entry.values) + else + nil + end + + row = { + entry.run_id, + entry.execution_id, + entry.workspace_id, + entry.timestamp, + entry.level, + template_id, + values_json, + now + } + + {row, t_cache} + end) + + fields = {:run_id, :execution_id, :workspace_id, :timestamp, :level, :template_id, :values_json, :created_at} + + case Store.insert_many(db, "messages", fields, rows) do + {:ok, _ids} -> {:ok, template_cache} + {:error, reason} -> {:error, reason} + end + end + + defp resolve_template(_db, nil, cache), do: {nil, cache} + + defp resolve_template(db, template, cache) do + hash = :crypto.hash(:sha256, template) + + case Map.get(cache, hash) do + nil -> + {:ok, id} = get_or_create_template(db, template) + {id, Map.put(cache, hash, id)} + + id -> + {id, cache} + end + end + + @doc """ + Query logs for a run with optional filters. + + Options: + - :run_id (required) - the run ID to query + - :execution_id - filter by execution ID + - :workspace_ids - list of workspace IDs to include + - :after - cursor for pagination (timestamp:id format) + - :limit - max entries to return (default 1000) + """ + def query_logs(db, opts) do + run_id = Keyword.fetch!(opts, :run_id) + execution_id = Keyword.get(opts, :execution_id) + workspace_ids = Keyword.get(opts, :workspace_ids) + after_cursor = Keyword.get(opts, :after) + limit = Keyword.get(opts, :limit, 1000) + + {where_clauses, args} = build_where_clauses(run_id, execution_id, workspace_ids, after_cursor) + + sql = """ + SELECT m.id, m.run_id, m.execution_id, m.workspace_id, m.timestamp, m.level, t.template, m.values_json, m.created_at + FROM messages m + LEFT JOIN templates t ON m.template_id = t.id + WHERE #{Enum.join(where_clauses, " AND ")} + ORDER BY m.timestamp ASC, m.id ASC + LIMIT ?#{length(args) + 1} + """ + + args = List.to_tuple(args ++ [limit]) + + case Store.query(db, sql, args) do + {:ok, rows} -> + entries = Enum.map(rows, &row_to_entry/1) + cursor = build_cursor(List.last(entries)) + {:ok, entries, cursor} + end + end + + defp build_where_clauses(run_id, execution_id, workspace_ids, after_cursor) do + clauses = ["m.run_id = ?1"] + args = [run_id] + + {clauses, args} = + if execution_id do + {clauses ++ ["m.execution_id = ?#{length(args) + 1}"], args ++ [execution_id]} + else + {clauses, args} + end + + {clauses, args} = + if workspace_ids && workspace_ids != [] do + # Build IN clause with positional parameters + start_idx = length(args) + 1 + placeholders = workspace_ids |> Enum.with_index(start_idx) |> Enum.map(fn {_, i} -> "?#{i}" end) + clause = "m.workspace_id IN (#{Enum.join(placeholders, ", ")})" + {clauses ++ [clause], args ++ workspace_ids} + else + {clauses, args} + end + + {clauses, args} = + if after_cursor do + case parse_cursor(after_cursor) do + {:ok, timestamp, id} -> + clause = "(m.timestamp > ?#{length(args) + 1} OR (m.timestamp = ?#{length(args) + 1} AND m.id > ?#{length(args) + 2}))" + {clauses ++ [clause], args ++ [timestamp, id]} + + :error -> + {clauses, args} + end + else + {clauses, args} + end + + {clauses, args} + end + + defp parse_cursor(cursor) when is_binary(cursor) do + case String.split(cursor, ":") do + [timestamp_str, id_str] -> + case {Integer.parse(timestamp_str), Integer.parse(id_str)} do + {{timestamp, ""}, {id, ""}} -> {:ok, timestamp, id} + _ -> :error + end + + _ -> + :error + end + end + + defp parse_cursor(_), do: :error + + defp build_cursor(nil), do: nil + + defp build_cursor(entry) do + "#{entry.timestamp}:#{entry.id}" + end + + defp row_to_entry({id, run_id, execution_id, workspace_id, timestamp, level, template, values_json, _created_at}) do + # Values are stored as validated JSON - return directly + values = + if values_json do + case Jason.decode(values_json) do + {:ok, v} when is_map(v) -> v + _ -> %{} + end + else + %{} + end + + %{ + id: id, + run_id: run_id, + execution_id: execution_id, + workspace_id: workspace_id, + timestamp: timestamp, + level: level, + template: template, + values: values + } + end +end diff --git a/server/lib/coflux/logs/supervisor.ex b/server/lib/coflux/logs/supervisor.ex new file mode 100644 index 00000000..0525c883 --- /dev/null +++ b/server/lib/coflux/logs/supervisor.ex @@ -0,0 +1,48 @@ +defmodule Coflux.Logs.Supervisor do + @moduledoc """ + DynamicSupervisor for per-project log servers. + + Manages a pool of Logs.Server processes, one per active project. + Servers are started on-demand when logs are written or subscribed to. + """ + + use DynamicSupervisor + + alias Coflux.Logs.Server + + def start_link(opts) do + DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl true + def init(_opts) do + DynamicSupervisor.init(strategy: :one_for_one) + end + + @doc """ + Get or start a logs server for the given project. + + Returns {:ok, pid} if successful. + """ + def get_server(project_id) do + case Registry.lookup(Coflux.Logs.Registry, project_id) do + [{pid, _}] -> + {:ok, pid} + + [] -> + start_server(project_id) + end + end + + defp start_server(project_id) do + spec = {Server, project_id: project_id} + + case DynamicSupervisor.start_child(__MODULE__, spec) do + {:ok, pid} -> + {:ok, pid} + + {:error, {:already_started, pid}} -> + {:ok, pid} + end + end +end diff --git a/server/lib/coflux/orchestration.ex b/server/lib/coflux/orchestration.ex index 8f220086..98acafb1 100644 --- a/server/lib/coflux/orchestration.ex +++ b/server/lib/coflux/orchestration.ex @@ -119,10 +119,6 @@ defmodule Coflux.Orchestration do call_server(project_id, {:get_asset_by_external_id, asset_external_id}) end - def record_logs(project_id, execution_id, messages) do - call_server(project_id, {:record_logs, execution_id, messages}) - end - def subscribe_workspaces(project_id, pid) do call_server(project_id, {:subscribe_workspaces, pid}) end @@ -155,10 +151,6 @@ defmodule Coflux.Orchestration do call_server(project_id, {:subscribe_run, run_id, pid}) end - def subscribe_logs(project_id, run_id, pid) do - call_server(project_id, {:subscribe_logs, run_id, pid}) - end - def subscribe_targets(project_id, workspace_id, pid) do call_server(project_id, {:subscribe_targets, workspace_id, pid}) end diff --git a/server/lib/coflux/orchestration/observations.ex b/server/lib/coflux/orchestration/observations.ex deleted file mode 100644 index bd31a8bd..00000000 --- a/server/lib/coflux/orchestration/observations.ex +++ /dev/null @@ -1,166 +0,0 @@ -defmodule Coflux.Orchestration.Observations do - import Coflux.Store - - alias Coflux.Orchestration.Values - - def record_logs(db, execution_id, messages) do - with_transaction(db, fn -> - now = current_timestamp() - - Enum.each(messages, fn {timestamp, level, template, values} -> - {:ok, template_id} = - if template do - get_or_create_template(db, template) - else - {:ok, nil} - end - - {:ok, message_id} = - insert_one(db, :messages, %{ - execution_id: execution_id, - timestamp: timestamp, - level: encode_level(level), - template_id: template_id, - created_at: now - }) - - {:ok, _} = - insert_many( - db, - :message_values, - {:message_id, :label_id, :value_id}, - Enum.map(values, fn {label, value} -> - {:ok, label_id} = get_or_create_label(db, label) - {:ok, value_id} = Values.get_or_create_value(db, value) - {message_id, label_id, value_id} - end) - ) - end) - end) - end - - def get_messages_for_run(db, run_id) do - case query( - db, - """ - SELECT m.id, m.execution_id, m.timestamp, m.level, m.template_id - FROM messages AS m - INNER JOIN executions AS e ON e.id = m.execution_id - INNER JOIN steps AS s ON s.id = e.step_id - WHERE s.run_id = ?1 - ORDER BY m.timestamp - """, - {run_id} - ) do - {:ok, rows} -> - messages = - Enum.map(rows, fn {message_id, execution_id, timestamp, level, template_id} -> - # TODO: batch? - {:ok, template} = - if template_id do - get_template_by_id(db, template_id) - else - {:ok, nil} - end - - {:ok, values} = get_values_for_message(db, message_id) - {execution_id, timestamp, decode_level(level), template, values} - end) - - {:ok, messages} - end - end - - def get_counts_for_run(db, run_id) do - case query( - db, - """ - SELECT m.execution_id, count(*) - FROM messages AS m - INNER JOIN executions AS e ON e.id = m.execution_id - INNER JOIN steps AS s ON s.id = e.step_id - WHERE s.run_id = ?1 - GROUP BY m.execution_id - """, - {run_id} - ) do - {:ok, rows} -> {:ok, Map.new(rows)} - end - end - - defp get_template_by_id(db, template_id) do - case query_one(db, "SELECT template FROM message_templates WHERE id = ?1", {template_id}) do - {:ok, {template}} -> {:ok, template} - end - end - - defp get_values_for_message(db, message_id) do - case query( - db, - """ - SELECT ml.label, mv.value_id - FROM message_values AS mv - INNER JOIN message_labels AS ml ON ml.id = mv.label_id - WHERE mv.message_id = ?1 - """, - {message_id} - ) do - {:ok, rows} -> - {:ok, - Map.new(rows, fn {label, value_id} -> - {:ok, value} = Values.get_value_by_id(db, value_id) - {label, value} - end)} - end - end - - defp get_or_create_template(db, template) do - case query_one(db, "SELECT id FROM message_templates WHERE template = ?1", {template}) do - {:ok, {template_id}} -> - {:ok, template_id} - - {:ok, nil} -> - case insert_one(db, :message_templates, %{template: template}) do - {:ok, template_id} -> {:ok, template_id} - end - end - end - - defp get_or_create_label(db, label) do - case query_one(db, "SELECT id FROM message_labels WHERE label = ?1", {label}) do - {:ok, {label_id}} -> - {:ok, label_id} - - {:ok, nil} -> - case insert_one(db, :message_labels, %{label: label}) do - {:ok, label_id} -> {:ok, label_id} - end - end - end - - defp encode_level(level) do - case level do - :debug -> 0 - :stdout -> 1 - :info -> 2 - :stderr -> 3 - :warning -> 4 - :error -> 5 - end - end - - defp decode_level(level) do - case level do - 0 -> :debug - 1 -> :stdout - 2 -> :info - 3 -> :stderr - 4 -> :warning - 5 -> :error - end - end - - defp current_timestamp() do - System.os_time(:millisecond) - end -end diff --git a/server/lib/coflux/orchestration/server.ex b/server/lib/coflux/orchestration/server.ex index fd354b94..1f3d9c1d 100644 --- a/server/lib/coflux/orchestration/server.ex +++ b/server/lib/coflux/orchestration/server.ex @@ -13,8 +13,7 @@ defmodule Coflux.Orchestration.Server do CacheConfigs, TagSets, Workers, - Manifests, - Observations + Manifests } @default_activation_timeout_ms 600_000 @@ -805,7 +804,16 @@ defmodule Coflux.Orchestration.Server do ) |> flush_notifications() - {:reply, {:ok, run.external_id, external_step_id, execution_id}, state} + # Return extended metadata for log references + execution_metadata = %{ + run_id: run.external_id, + step_id: external_step_id, + attempt: attempt, + module: module, + target: target_name + } + + {:reply, {:ok, run.external_id, external_step_id, execution_id, execution_metadata}, state} end end @@ -1091,6 +1099,13 @@ defmodule Coflux.Orchestration.Server do {:wait, state} {:ok, result} -> + # Only enrich value results with resolved references (asset metadata, execution metadata) + # Other result types (error, abandoned, etc.) don't need enrichment for the client + result = + case result do + {:value, value} -> {:value, build_value(value, state.db)} + other -> other + end {{:ok, result}, state} end @@ -1104,17 +1119,25 @@ defmodule Coflux.Orchestration.Server do {:ok, asset_id} = Assets.get_or_create_asset(state.db, name, entries) :ok = Results.put_execution_asset(state.db, execution_id, asset_id) - {external_id, name, total_count, total_size, entry} = resolve_asset(state.db, asset_id) + {external_id, asset_name, total_count, total_size, entry} = resolve_asset(state.db, asset_id) state = state |> notify_listeners( {:run, run_id}, - {:asset, execution_id, external_id, {name, total_count, total_size, entry}} + {:asset, execution_id, external_id, {asset_name, total_count, total_size, entry}} ) |> flush_notifications() - {:reply, {:ok, asset_id}, state} + # Return extended metadata for log references + asset_metadata = %{ + external_id: external_id, + name: asset_name, + total_count: total_count, + total_size: total_size + } + + {:reply, {:ok, asset_id, asset_metadata}, state} end def handle_call({:get_asset_entries, asset_id, from_execution_id}, _from, state) do @@ -1141,27 +1164,6 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call({:record_logs, execution_id, messages}, _from, state) do - {:ok, run_id} = Runs.get_execution_run_id(state.db, execution_id) - - case Observations.record_logs(state.db, execution_id, messages) do - :ok -> - messages = - Enum.map(messages, fn {timestamp, level, template, values} -> - {execution_id, timestamp, level, template, - Map.new(values, fn {k, v} -> {k, build_value(v, state.db)} end)} - end) - - state = - state - |> notify_listeners({:logs, run_id}, {:messages, messages}) - |> notify_listeners({:run, run_id}, {:log_counts, execution_id, length(messages)}) - |> flush_notifications() - - {:reply, :ok, state} - end - end - def handle_call({:subscribe_workspaces, pid}, _from, state) do {:ok, ref, state} = add_listener(state, :workspaces, pid) @@ -1365,7 +1367,6 @@ defmodule Coflux.Orchestration.Server do {:ok, run_executions} = Runs.get_run_executions(state.db, run.id) {:ok, run_dependencies} = Runs.get_run_dependencies(state.db, run.id) {:ok, run_children} = Runs.get_run_children(state.db, run.id) - {:ok, log_counts} = Observations.get_counts_for_run(state.db, run.id) {:ok, groups} = Runs.get_groups_for_run(state.db, run.id) cache_configs = @@ -1465,8 +1466,7 @@ defmodule Coflux.Orchestration.Server do assets: assets, dependencies: dependencies, result: result, - children: Map.get(run_children, execution_id, []), - log_count: Map.get(log_counts, execution_id, 0) + children: Map.get(run_children, execution_id, []) }} end) }} @@ -1477,23 +1477,6 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call({:subscribe_logs, external_run_id, pid}, _from, state) do - case Runs.get_run_by_external_id(state.db, external_run_id) do - {:ok, run} -> - case Observations.get_messages_for_run(state.db, run.id) do - {:ok, messages} -> - messages = - Enum.map(messages, fn {execution_id, timestamp, level, template, values} -> - {execution_id, timestamp, level, template, - Map.new(values, fn {k, v} -> {k, build_value(v, state.db)} end)} - end) - - {:ok, ref, state} = add_listener(state, {:logs, run.id}, pid) - {:reply, {:ok, ref, messages}, state} - end - end - end - def handle_call({:subscribe_targets, workspace_id, pid}, _from, state) do # TODO: indicate which are archived {:ok, workflows} = Manifests.get_all_workflows_for_workspace(state.db, workspace_id) @@ -1672,6 +1655,9 @@ defmodule Coflux.Orchestration.Server do {:ok, assigned_at} = Runs.assign_execution(state.db, execution.execution_id, session_id) + # Enrich arguments with resolved references (asset/execution metadata) + enriched_arguments = Enum.map(arguments, &build_value(&1, state.db)) + state = state |> update_in( @@ -1681,7 +1667,7 @@ defmodule Coflux.Orchestration.Server do |> send_session( session_id, {:execute, execution.execution_id, execution.module, execution.target, - arguments} + enriched_arguments, execution.run_external_id, execution.workspace_id} ) {state, [{execution, assigned_at} | assigned], unassigned} @@ -2983,6 +2969,13 @@ defmodule Coflux.Orchestration.Server do {:ok, result} -> state = Map.put(state, :waiting, waiting) + # Enrich value results with resolved references (asset metadata, execution metadata) + result = + case result do + {:value, value} -> {:value, build_value(value, state.db)} + other -> other + end + Enum.reduce( execution_waiting, state, diff --git a/server/lib/coflux/topics/logs.ex b/server/lib/coflux/topics/logs.ex deleted file mode 100644 index d206ee13..00000000 --- a/server/lib/coflux/topics/logs.ex +++ /dev/null @@ -1,110 +0,0 @@ -defmodule Coflux.Topics.Logs do - use Topical.Topic, route: ["runs", :run_id, "logs", :workspace_id] - - alias Coflux.Orchestration - - import Coflux.TopicUtils - - def connect(params, context) do - {:ok, Map.put(params, :project, context.project)} - end - - def init(params) do - project_id = Map.fetch!(params, :project) - run_id = Map.fetch!(params, :run_id) - workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) - - case Orchestration.subscribe_run(project_id, run_id, self()) do - {:ok, _run, _parent, steps, _ref} -> - case Orchestration.subscribe_logs(project_id, run_id, self()) do - {:ok, _ref, messages} -> - run_workspace_id = - steps - |> Map.values() - |> Enum.reject(& &1.parent_id) - |> Enum.min_by(& &1.created_at) - |> Map.fetch!(:executions) - |> Map.values() - |> Enum.min_by(& &1.created_at) - |> Map.fetch!(:workspace_id) - - workspace_ids = Enum.uniq([run_workspace_id, workspace_id]) - - execution_ids = - steps - |> Map.values() - |> Enum.flat_map(fn step -> - step.executions - |> Map.values() - |> Enum.filter(&(&1.workspace_id in workspace_ids)) - |> Enum.map(& &1.execution_id) - end) - |> MapSet.new() - - topic = - messages - |> Enum.filter(&(elem(&1, 0) in execution_ids)) - |> Enum.map(&build_message/1) - |> Topic.new(%{ - workspace_ids: workspace_ids, - execution_ids: execution_ids - }) - - {:ok, topic} - end - end - end - - def handle_info({:topic, _ref, notifications}, topic) do - topic = Enum.reduce(notifications, topic, &process_notification(&2, &1)) - {:ok, topic} - end - - defp process_notification(topic, {:execution, _, _, execution_id, workspace_id, _, _, _}) do - if workspace_id in topic.state.workspace_ids do - update_in(topic.state.execution_ids, &MapSet.put(&1, execution_id)) - else - topic - end - end - - defp process_notification(topic, {:messages, messages}) do - messages = - messages - |> Enum.filter(&(elem(&1, 0) in topic.state.execution_ids)) - |> Enum.map(&build_message/1) - - Topic.insert(topic, [], messages) - end - - defp process_notification(topic, {:step, _, _, _}), do: topic - defp process_notification(topic, {:group, _, _, _}), do: topic - defp process_notification(topic, {:asset, _, _, _}), do: topic - defp process_notification(topic, {:assigned, _}), do: topic - defp process_notification(topic, {:result_dependency, _, _, _}), do: topic - defp process_notification(topic, {:child, _, _}), do: topic - defp process_notification(topic, {:result, _, _, _}), do: topic - defp process_notification(topic, {:result_result, _, _, _}), do: topic - defp process_notification(topic, {:log_counts, _, _}), do: topic - - defp build_message({execution_id, timestamp, level, template, values}) do - [ - Integer.to_string(execution_id), - timestamp, - encode_level(level), - template, - Map.new(values, fn {k, v} -> {k, build_value(v)} end) - ] - end - - defp encode_level(level) do - case level do - :debug -> 0 - :stdout -> 1 - :info -> 2 - :stderr -> 3 - :warning -> 4 - :error -> 5 - end - end -end diff --git a/server/lib/coflux/topics/run.ex b/server/lib/coflux/topics/run.ex index caa11241..df07ed47 100644 --- a/server/lib/coflux/topics/run.ex +++ b/server/lib/coflux/topics/run.ex @@ -92,8 +92,7 @@ defmodule Coflux.Topics.Run do {Integer.to_string(dependency_id), build_dependency(dependency)} end), children: [], - result: nil, - logCount: 0 + result: nil } ) else @@ -173,14 +172,6 @@ defmodule Coflux.Topics.Run do end) end - defp process_notification(topic, {:log_counts, execution_id, delta}) do - update_execution(topic, execution_id, fn topic, base_path -> - path = base_path ++ [:logCount] - count = get_in(topic.value, path) + delta - Topic.set(topic, base_path ++ [:logCount], count) - end) - end - defp build_run(run, parent, steps, workspace_ids) do %{ createdAt: run.created_at, @@ -226,8 +217,7 @@ defmodule Coflux.Topics.Run do end), dependencies: build_dependencies(execution.dependencies), children: Enum.map(execution.children, &build_child/1), - result: build_result(execution.result), - logCount: execution.log_count + result: build_result(execution.result) }} end) }} diff --git a/server/lib/coflux/topics/workspaces.ex b/server/lib/coflux/topics/workspaces.ex index 5d8216c4..69efa577 100644 --- a/server/lib/coflux/topics/workspaces.ex +++ b/server/lib/coflux/topics/workspaces.ex @@ -35,7 +35,7 @@ defmodule Coflux.Topics.Workspaces do defp build_workspace(workspace) do %{ name: workspace.name, - baseId: workspace.base_id, + baseId: if(workspace.base_id, do: Integer.to_string(workspace.base_id)), state: build_state(workspace.state) } end diff --git a/server/lib/coflux/web.ex b/server/lib/coflux/web.ex index d5612cb7..9d164961 100644 --- a/server/lib/coflux/web.ex +++ b/server/lib/coflux/web.ex @@ -14,6 +14,7 @@ defmodule Coflux.Web do [ {"/.well-known/com.coflux", Handlers.WellKnown, []}, {"/blobs/:key", Handlers.Blobs, []}, + {"/logs", Handlers.Logs, []}, {"/worker", Handlers.Worker, []}, {"/topics", Handlers.Topics, registry: Coflux.TopicalRegistry}, {"/api/[...]", Handlers.Api, []}, diff --git a/server/priv/migrations/logs/1.sql b/server/priv/migrations/logs/1.sql new file mode 100644 index 00000000..f08a1841 --- /dev/null +++ b/server/priv/migrations/logs/1.sql @@ -0,0 +1,24 @@ +-- Template deduplication (biggest space savings) +CREATE TABLE templates ( + id INTEGER PRIMARY KEY, + hash BLOB NOT NULL UNIQUE, + template TEXT NOT NULL +) STRICT; + +-- Log messages +CREATE TABLE messages ( + id INTEGER PRIMARY KEY, + run_id TEXT NOT NULL, + execution_id INTEGER NOT NULL, + workspace_id INTEGER NOT NULL, + timestamp INTEGER NOT NULL, + level INTEGER NOT NULL, + template_id INTEGER REFERENCES templates(id), + values_json TEXT, + created_at INTEGER NOT NULL +) STRICT; + +-- Indexes for common query patterns +CREATE INDEX idx_messages_run_workspace_ts ON messages(run_id, workspace_id, timestamp); +CREATE INDEX idx_messages_execution_ts ON messages(execution_id, timestamp); +CREATE INDEX idx_messages_created ON messages(created_at);