Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions clients/python/coflux/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ def _init(
serialiser_configs: list[config.SerialiserConfig],
blob_threshold: int,
blob_store_configs: list[config.BlobStoreConfig],
log_store_config: config.LogStoreConfig,
concurrency: int,
session_id: str | None,
register: bool,
Expand Down Expand Up @@ -310,6 +311,7 @@ def _init(
serialiser_configs,
blob_threshold,
blob_store_configs,
log_store_config,
session_id,
targets,
) as worker:
Expand Down Expand Up @@ -822,7 +824,7 @@ def blobs_get(host: str, secure: bool | None, key: str):
raise click.ClickException("Blob store not configured")

out = click.get_binary_stream("stdout")
with BlobManager(config.blobs.stores, host, secure=use_secure) as blob_manager:
with BlobManager(config.blobs.stores, server_host=host, server_secure=use_secure) as blob_manager:
blob = blob_manager.get(key)
for chunk in iter(lambda: blob.read(64 * 1024), b""):
out.write(chunk)
Expand Down Expand Up @@ -965,7 +967,7 @@ def assets_download(

total_size = sum(v["size"] for v in entries.values())

with BlobManager(config.blobs.stores, host, secure=use_secure) as blob_manager:
with BlobManager(config.blobs.stores, server_host=host, server_secure=use_secure) as blob_manager:
click.echo(f"Downloading {len(entries)} files ({_human_size(total_size)})...")
# TODO: parallelise downloads
with click.progressbar(entries.items(), label="") as bar:
Expand Down Expand Up @@ -1076,6 +1078,7 @@ def worker(
"serialiser_configs": config and config.serialisers,
"blob_threshold": config and config.blobs and config.blobs.threshold,
"blob_store_configs": config and config.blobs and config.blobs.stores,
"log_store_config": config and config.logs and config.logs.store,
"concurrency": concurrency,
"session_id": session,
"register": register or dev,
Expand Down
16 changes: 8 additions & 8 deletions clients/python/coflux/blobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def upload(self, path: Path) -> str:


class HttpStore(Store):
def __init__(self, protocol: t.Literal["http", "https"], host: str):
self._protocol = protocol
def __init__(self, host: str, *, secure: bool = False):
self._protocol = "https" if secure else "http"
self._host = host

def __enter__(self):
Expand Down Expand Up @@ -161,20 +161,20 @@ def upload(self, path: Path) -> str:
return self.put(file)


def _create(config_: config.BlobStoreConfig, server_host: str, secure: bool):
def _create(config_: config.BlobStoreConfig, server_host: str, server_secure: bool):
if config_.type == "http":
# Use secure setting if config uses default host, otherwise respect config's protocol
protocol = ("https" if secure else "http") if config_.host is None else config_.protocol
return HttpStore(protocol, config_.host or server_host)
host = config_.host if config_.host is not None else server_host
secure = config_.secure if config_.secure is not None else server_secure
return HttpStore(host, secure=secure)
elif config_.type == "s3":
return S3Store(config_.bucket, config_.prefix, config_.region)
else:
raise ValueError("unrecognised blob store config")


class Manager:
def __init__(self, store_configs: list[config.BlobStoreConfig], server_host: str, *, secure: bool):
self._stores = [_create(c, server_host, secure) for c in store_configs]
def __init__(self, store_configs: list[config.BlobStoreConfig], *, server_host: str, server_secure: bool):
self._stores = [_create(c, server_host, server_secure) for c in store_configs]

def __enter__(self):
# TODO: ?
Expand Down
31 changes: 29 additions & 2 deletions clients/python/coflux/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ class ServerConfig(pydantic.BaseModel):

class HTTPBlobStoreConfig(pydantic.BaseModel):
type: t.Literal["http"] = "http"
protocol: t.Literal["http", "https"] = "http"
host: str | None = None
secure: bool | None = None


class S3BlobStoreConfig(pydantic.BaseModel):
Expand All @@ -29,7 +29,7 @@ class S3BlobStoreConfig(pydantic.BaseModel):
]


def _default_blob_stores():
def _default_blob_stores() -> list[BlobStoreConfig]:
return [HTTPBlobStoreConfig()]


Expand All @@ -38,6 +38,32 @@ class BlobsConfig(pydantic.BaseModel):
stores: list[BlobStoreConfig] = pydantic.Field(default_factory=_default_blob_stores)


class HTTPLogStoreConfig(pydantic.BaseModel):
"""HTTP-based log store that POSTs logs to the Coflux server."""

type: t.Literal["http"] = "http"
host: str | None = None
secure: bool | None = None
batch_size: int = 100
flush_interval: float = 0.5


LogStoreConfig = t.Annotated[
HTTPLogStoreConfig,
pydantic.Field(discriminator="type"),
]


def _default_log_store():
return HTTPLogStoreConfig()


class LogsConfig(pydantic.BaseModel):
"""Configuration for log storage."""

store: LogStoreConfig = pydantic.Field(default_factory=_default_log_store)


class PandasSerialiserConfig(pydantic.BaseModel):
type: t.Literal["pandas"] = "pandas"

Expand Down Expand Up @@ -78,6 +104,7 @@ class Config(pydantic.BaseModel):
default_factory=_default_serialisers
)
blobs: BlobsConfig = pydantic.Field(default_factory=BlobsConfig)
logs: LogsConfig = pydantic.Field(default_factory=LogsConfig)


class DockerLauncherConfig(pydantic.BaseModel):
Expand Down
Loading