From 43074324583831aee2a638c99f3c25d34a633117 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Fri, 13 Mar 2026 11:23:10 +0000 Subject: [PATCH 1/4] Blueprint config (CLI, envvar, file) --- dimos/core/blueprints.py | 24 +++-- dimos/core/module.py | 6 ++ dimos/core/module_coordinator.py | 6 +- dimos/core/test_blueprints.py | 8 ++ dimos/core/worker.py | 3 + dimos/core/worker_manager.py | 7 +- dimos/robot/cli/dimos.py | 88 ++++++++++++++++++- dimos/robot/cli/test_dimos.py | 87 ++++++++++++++++++ dimos/visualization/rerun/bridge.py | 4 + .../web/websocket_vis/websocket_vis_module.py | 2 +- docs/usage/blueprints.md | 39 ++++++++ 11 files changed, 261 insertions(+), 13 deletions(-) create mode 100644 dimos/robot/cli/test_dimos.py diff --git a/dimos/core/blueprints.py b/dimos/core/blueprints.py index abfeb29b2f..1559c55de3 100644 --- a/dimos/core/blueprints.py +++ b/dimos/core/blueprints.py @@ -22,6 +22,8 @@ from types import MappingProxyType from typing import TYPE_CHECKING, Any, Literal, get_args, get_origin, get_type_hints +from pydantic import BaseModel, create_model + if TYPE_CHECKING: from dimos.protocol.service.system_configurator.base import SystemConfigurator @@ -130,6 +132,11 @@ def create(cls, module: type[ModuleBase], **kwargs: Any) -> "Blueprint": def disabled_modules(self, *modules: type[ModuleBase]) -> "Blueprint": return replace(self, disabled_modules_tuple=self.disabled_modules_tuple + modules) + def config(self) -> BaseModel: + configs = {b.module.name: (b.module.default_config | None, None) for b in self.blueprints} + configs["g"] = (GlobalConfig | None, None) + return create_model("BlueprintConfig", __config__={"extra": "forbid"}, **configs) + def transports(self, transports: dict[tuple[str, type], Any]) -> "Blueprint": return replace(self, transport_map=MappingProxyType({**self.transport_map, **transports})) @@ -274,13 +281,16 @@ def _verify_no_name_conflicts(self) -> None: raise ValueError("\n".join(error_lines)) def _deploy_all_modules( - self, module_coordinator: ModuleCoordinator, global_config: GlobalConfig + self, + module_coordinator: ModuleCoordinator, + global_config: GlobalConfig, + blueprint_args: dict[str, dict[str, Any]], ) -> None: module_specs: list[ModuleSpec] = [] for blueprint in self._active_blueprints: - module_specs.append((blueprint.module, global_config, blueprint.kwargs)) + module_specs.append((blueprint.module, global_config, blueprint.kwargs.copy())) - module_coordinator.deploy_parallel(module_specs) + module_coordinator.deploy_parallel(module_specs, blueprint_args) def _connect_streams(self, module_coordinator: ModuleCoordinator) -> None: # dict when given (final/remapped) stream name+type, provides a list of modules + original (non-remapped) stream names @@ -472,12 +482,12 @@ def _connect_rpc_methods(self, module_coordinator: ModuleCoordinator) -> None: def build( self, - cli_config_overrides: Mapping[str, Any] | None = None, + blueprint_args: Mapping[str, Any] | None = None, ) -> ModuleCoordinator: logger.info("Building the blueprint") global_config.update(**dict(self.global_config_overrides)) - if cli_config_overrides: - global_config.update(**dict(cli_config_overrides)) + if "g" in blueprint_args: + global_config.update(**blueprint_args.pop("g")) self._run_configurators() self._check_requirements() @@ -488,7 +498,7 @@ def build( module_coordinator.start() # all module constructors are called here (each of them setup their own) - self._deploy_all_modules(module_coordinator, global_config) + self._deploy_all_modules(module_coordinator, global_config, blueprint_args or {}) self._connect_streams(module_coordinator) self._connect_rpc_methods(module_coordinator) self._connect_module_refs(module_coordinator) diff --git a/dimos/core/module.py b/dimos/core/module.py index ab21ce17a9..9618f99f29 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -107,6 +107,7 @@ class ModuleBase(Configurable[ModuleConfigT], Resource): def __init__(self, config_args: dict[str, Any]): super().__init__(**config_args) + print("MODULE", self, config_args, self.config) self._module_closed_lock = threading.Lock() self._loop, self._loop_thread = get_loop() self._disposables = CompositeDisposable() @@ -117,6 +118,11 @@ def __init__(self, config_args: dict[str, Any]): except ValueError: ... + @classproperty + def name(self) -> str: + """Name for this module to be used for blueprint configs.""" + return self.__name__.lower() + @property def frame_id(self) -> str: base = self.config.frame_id or self.__class__.__name__ diff --git a/dimos/core/module_coordinator.py b/dimos/core/module_coordinator.py index 10227eae93..efed155cf0 100644 --- a/dimos/core/module_coordinator.py +++ b/dimos/core/module_coordinator.py @@ -128,11 +128,13 @@ def deploy( self._deployed_modules[module_class] = module # type: ignore[assignment] return module # type: ignore[return-value] - def deploy_parallel(self, module_specs: list[ModuleSpec]) -> list[ModuleProxy]: + def deploy_parallel( + self, module_specs: list[ModuleSpec], blueprint_args: dict[str, dict[str, Any]] + ) -> list[ModuleProxy]: if not self._client: raise ValueError("Not started") - modules = self._client.deploy_parallel(module_specs) + modules = self._client.deploy_parallel(module_specs, blueprint_args) for (module_class, _, _), module in zip(module_specs, modules, strict=True): self._deployed_modules[module_class] = module # type: ignore[assignment] return modules # type: ignore[return-value] diff --git a/dimos/core/test_blueprints.py b/dimos/core/test_blueprints.py index 19dbf62c74..e7ac64cda4 100644 --- a/dimos/core/test_blueprints.py +++ b/dimos/core/test_blueprints.py @@ -152,6 +152,14 @@ def test_autoconnect() -> None: ) +def test_config() -> None: + blueprint = autoconnect(module_a(), module_b()) + config = blueprint.config() + assert config.model_fields.keys() == {"modulea", "moduleb"} + assert config.model_fields["modulea"].annotation == ModuleA.default_config + assert config.model_fields["moduleb"].annotation == ModuleB.default_config + + def test_transports() -> None: custom_transport = LCMTransport("/custom_topic", Data1) blueprint_set = autoconnect(module_a(), module_b()).transports( diff --git a/dimos/core/worker.py b/dimos/core/worker.py index dca561f16c..fcc64192f5 100644 --- a/dimos/core/worker.py +++ b/dimos/core/worker.py @@ -214,6 +214,7 @@ def deploy_module( "module_class": module_class, "kwargs": kwargs, } + print(module_class, kwargs) with self._lock: self._conn.send(request) response = self._conn.recv() @@ -343,6 +344,7 @@ def _worker_loop(conn: Connection, instances: dict[int, Any], worker_id: int) -> module_class = request["module_class"] kwargs = request["kwargs"] module_id = request["module_id"] + print("DEPLOY", module_class, kwargs) instance = module_class(**kwargs) instances[module_id] = instance response["result"] = module_id @@ -376,6 +378,7 @@ def _worker_loop(conn: Connection, instances: dict[int, Any], worker_id: int) -> except Exception as e: response["error"] = f"{e.__class__.__name__}: {e}\n{traceback.format_exc()}" + raise try: conn.send(response) diff --git a/dimos/core/worker_manager.py b/dimos/core/worker_manager.py index 4cd5eec8d7..ff7121934f 100644 --- a/dimos/core/worker_manager.py +++ b/dimos/core/worker_manager.py @@ -61,7 +61,11 @@ def deploy( actor = worker.deploy_module(module_class, global_config, kwargs=kwargs) return RPCClient(actor, module_class) - def deploy_parallel(self, module_specs: Iterable[ModuleSpec]) -> list[RPCClient]: + def deploy_parallel( + self, + module_specs: Iterable[ModuleSpec], + blueprint_args: dict[str, dict[str, Any]], + ) -> list[RPCClient]: if self._closed: raise RuntimeError("WorkerManager is closed") @@ -76,6 +80,7 @@ def deploy_parallel(self, module_specs: Iterable[ModuleSpec]) -> list[RPCClient] for module_class, global_config, kwargs in module_specs: worker = self._select_worker() worker.reserve_slot() + kwargs.update(blueprint_args.get(module_class.name, {})) assignments.append((worker, module_class, global_config, kwargs)) def _deploy( diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index 1137a612f3..f160ac841c 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -18,20 +18,32 @@ import inspect import json import os +from pathlib import Path import sys import time +import types from typing import Any, get_args, get_origin import click from dotenv import load_dotenv +from pydantic import BaseModel +from pydantic_core import PydanticUndefined import requests import typer from dimos.agents.mcp.mcp_adapter import McpAdapter, McpError +from dimos.core.blueprints import Blueprint from dimos.core.global_config import GlobalConfig, global_config from dimos.core.run_registry import get_most_recent, is_pid_alive, stop_entry from dimos.utils.logging_config import setup_logger +try: + from gi.repository import GLib +except ImportError: + CONFIG_DIR = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) +else: + CONFIG_DIR = Path(GLib.get_user_config_dir()) + logger = setup_logger() main = typer.Typer( @@ -108,12 +120,72 @@ def callback(**kwargs) -> None: # type: ignore[no-untyped-def] main.callback()(create_dynamic_callback()) # type: ignore[no-untyped-call] +def arghelp(config: BaseModel, blueprint: Blueprint, indent: str = " ", module: str = "") -> str: + output = "" + for k, info in config.model_fields.items(): + if k == "g": + continue + t = info.annotation + if isinstance(t, types.GenericAlias): + # Can't be specified on CLI + continue + + if issubclass(t, BaseModel): + output += f"{indent}{module}{k}:\n" + # Find blueprint atom + bp = next(bp for bp in blueprint.blueprints if bp.module.name == k) + output += arghelp(t, bp, indent=indent + " ", module=module + k + ".") + else: + # Use __name__ to avoid "" style output on basic types. + display_type = t.__name__ if isinstance(t, type) else t + required = "[Required] " if info.is_required() and k not in blueprint.kwargs else "" + d = blueprint.kwargs.get(k, info.default) + default = f" (default: {d})" if d is not PydanticUndefined else "" + output += f"{indent}* {required}{module}{k}: {display_type}{default}\n" + return output + + +def load_config_args(config: BaseModel, args: Iterable[str], path: Path) -> dict[str, Any]: + try: + kwargs = json.loads(path.read_text()) + except (OSError, json.JSONDecodeError): + kwargs = {} + + for k, v in os.environ.items(): + parts = k.lower().split("__") + if parts[0] not in config.model_fields: + continue + d = kwargs + for p in parts[:-1]: + d = d.setdefault(p, {}) + d[parts[-1]] = v + + for arg in args: + k, _, v = arg.partition("=") + parts = k.split(".") + d = kwargs + for p in parts[:-1]: + d = d.setdefault(p, {}) + d[parts[-1]] = v + + # We don't need this config, but this atleast validates the user input first. + # This will help catch misspellings and similar mistakes. + config(**kwargs) + + return kwargs + + @main.command() def run( ctx: typer.Context, robot_types: list[str] = typer.Argument(..., help="Blueprints or modules to run"), daemon: bool = typer.Option(False, "--daemon", "-d", help="Run in background"), disable: list[str] = typer.Option([], "--disable", help="Module names to disable"), + blueprint_args: list[str] = typer.Option((), "--option", "-o"), + config_path: Path = typer.Option( + CONFIG_DIR / "dimos", "--config", "-c", help="Path to config file" + ), + show_help: bool = typer.Option(False, "--help"), ) -> None: """Start a robot blueprint""" logger.info("Starting DimOS") @@ -132,7 +204,7 @@ def run( setup_exception_handler() cli_config_overrides: dict[str, Any] = ctx.obj - global_config.update(**cli_config_overrides) + # global_config.update(**cli_config_overrides) # Clean stale registry entries stale = cleanup_stale() @@ -163,7 +235,17 @@ def run( disabled_classes = tuple(get_module_by_name(name).blueprints[0].module for name in disable) blueprint = blueprint.disabled_modules(*disabled_classes) - coordinator = blueprint.build(cli_config_overrides=cli_config_overrides) + if show_help: + print("Blueprint arguments:") + print(arghelp(blueprint.config(), blueprint)) + return + + blueprint_config = blueprint.config() + kwargs = load_config_args(blueprint_config, blueprint_args, config_path) + if cli_config_overrides: + kwargs["g"] = cli_config_overrides + + coordinator = blueprint.build(kwargs) if daemon: from dimos.core.daemon import ( @@ -465,6 +547,8 @@ def restart( typer.echo(f"Error: failed to restart — {exc}", err=True) raise typer.Exit(1) + sub_command(blueprint_args) + @main.command() def show_config(ctx: typer.Context) -> None: diff --git a/dimos/robot/cli/test_dimos.py b/dimos/robot/cli/test_dimos.py new file mode 100644 index 0000000000..a87fdcc14f --- /dev/null +++ b/dimos/robot/cli/test_dimos.py @@ -0,0 +1,87 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dimos.core.blueprints import autoconnect +from dimos.core.module import Module, ModuleConfig + +# from dimos.robot.cli.dimos import arghelp +from dimos.robot.unitree.go2.connection import GO2Connection +from dimos.visualization.rerun.bridge import RerunBridgeModule, _default_blueprint + + +def test_blueprint_arghelp(): + blueprint = autoconnect(RerunBridgeModule.blueprint(), GO2Connection.blueprint()) + output = arghelp(blueprint.config(), blueprint) + assert output.split("\n") == [ + " rerunbridgemodule:", + " * rerunbridgemodule.frame_id_prefix: str | None (default: None)", + " * rerunbridgemodule.frame_id: str | None (default: None)", + " * rerunbridgemodule.entity_prefix: str (default: world)", + " * rerunbridgemodule.topic_to_entity: collections.abc.Callable[[typing.Any], str] | None (default: None)", + " * rerunbridgemodule.viewer_mode: typing.Literal['native', 'web', 'connect', 'none']", + " * rerunbridgemodule.connect_url: str (default: rerun+http://127.0.0.1:9877/proxy)", + " * rerunbridgemodule.memory_limit: str (default: 25%)", + f" * rerunbridgemodule.blueprint: collections.abc.Callable[rerun.blueprint.api.Blueprint] | None (default: {_default_blueprint})", + " go2connection:", + " * go2connection.frame_id_prefix: str | None (default: None)", + " * go2connection.frame_id: str | None (default: None)", + " * go2connection.ip: str", + "", + ] + + +def test_blueprint_arghelp_extra_args(): + """Test defaults passed to .blueprint() override.""" + + bridge = RerunBridgeModule.blueprint(frame_id_prefix="foo", viewer_mode="web") + blueprint = autoconnect(bridge, GO2Connection.blueprint(ip="1.1.1.1")) + output = arghelp(blueprint.config(), blueprint) + assert output.split("\n") == [ + " rerunbridgemodule:", + " * rerunbridgemodule.frame_id_prefix: str | None (default: foo)", + " * rerunbridgemodule.frame_id: str | None (default: None)", + " * rerunbridgemodule.entity_prefix: str (default: world)", + " * rerunbridgemodule.topic_to_entity: collections.abc.Callable[[typing.Any], str] | None (default: None)", + " * rerunbridgemodule.viewer_mode: typing.Literal['native', 'web', 'connect', 'none'] (default: web)", + " * rerunbridgemodule.connect_url: str (default: rerun+http://127.0.0.1:9877/proxy)", + " * rerunbridgemodule.memory_limit: str (default: 25%)", + f" * rerunbridgemodule.blueprint: collections.abc.Callable[rerun.blueprint.api.Blueprint] | None (default: {_default_blueprint})", + " go2connection:", + " * go2connection.frame_id_prefix: str | None (default: None)", + " * go2connection.frame_id: str | None (default: None)", + " * go2connection.ip: str (default: 1.1.1.1)", + "", + ] + + +def test_blueprint_arghelp_required(): + """Test required arguments.""" + + class Config(ModuleConfig): + foo: int + spam: str = "eggs" + + class TestModule(Module[Config]): + default_config = Config + + blueprint = TestModule.blueprint() + output = arghelp(blueprint.config(), blueprint) + assert output.split("\n") == [ + " testmodule:", + " * testmodule.frame_id_prefix: str | None (default: None)", + " * testmodule.frame_id: str | None (default: None)", + " * [Required] testmodule.foo: int", + " * testmodule.spam: str (default: eggs)", + "", + ] diff --git a/dimos/visualization/rerun/bridge.py b/dimos/visualization/rerun/bridge.py index 6729f143cd..abea948eec 100644 --- a/dimos/visualization/rerun/bridge.py +++ b/dimos/visualization/rerun/bridge.py @@ -200,6 +200,10 @@ class RerunBridgeModule(Module[Config]): default_config = Config + def __init__(self, **kwargs): + super().__init__(**kwargs) + print("BRIDEG" * 10, self.config.memory_limit) + @lru_cache(maxsize=256) def _visual_override_for_entity_path( self, entity_path: str diff --git a/dimos/web/websocket_vis/websocket_vis_module.py b/dimos/web/websocket_vis/websocket_vis_module.py index 7a5c9587e1..689828ddbf 100644 --- a/dimos/web/websocket_vis/websocket_vis_module.py +++ b/dimos/web/websocket_vis/websocket_vis_module.py @@ -64,7 +64,7 @@ class WebsocketConfig(ModuleConfig): - port: int = 7779 + port: int class WebsocketVisModule(Module[WebsocketConfig]): diff --git a/docs/usage/blueprints.md b/docs/usage/blueprints.md index 80a6b24b19..04976c8790 100644 --- a/docs/usage/blueprints.md +++ b/docs/usage/blueprints.md @@ -230,6 +230,45 @@ The config is normally taken from .env or from environment variables. But you ca blueprint = ModuleA.blueprint().global_config(n_workers=8) ``` +## Providing blueprint configuration to users + +`Blueprint.config()` can be used to get a `pydantic.BaseModel` that can be used to +inspect or test configuration settings that can be passed to `Blueprint.build()`: + +```python session=blueprint-ex1 +# Validate config input +blueprint_args = { + "module1": {"arg1": 5} +} +config = base_blueprint.config() +config(**blueprint_args) # raises pydantic.ValidationError if args are incorrect +``` + +`dimos.robot.cli.dimos.arghelp()` is a helper function that will return a string +containing all details of these arguments (this is how the output is produced when +running `dimos run unitree-go2 --help`, for example): + +```python session=blueprint-ex1 +from dimos.robot.cli.dimos import arghelp +print(arghelp(base_blueprint.config(), base_blueprint)) +``` + +Another function is `dimos.robot.cli.dimos.load_config_args()` which can create the +argument dict for users from a config file, environment variables and CLI arguments: + + +```python session=blueprint-ex1 +from dimos.robot.cli.dimos import load_config_args + +config_path = Path.home() / "base-blueprint-config.json" +cli_args = ["arg1=5"] +blueprint_args = load_config_args(base_blueprint.config(), cli_args, config_path) +# Test user input is valid +config(**blueprint_args) +# Then we can build the blueprint +base_blueprint.build(blueprint_args) +``` + ## Calling the methods of other modules Imagine you have this code: From 87c761d63e1313a9763d61dbe23ba628b770e856 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Fri, 13 Mar 2026 11:49:15 +0000 Subject: [PATCH 2/4] Apply suggestions from code review Co-authored-by: Sam Bull --- dimos/core/module.py | 1 - dimos/core/worker.py | 2 -- dimos/robot/cli/dimos.py | 1 - dimos/visualization/rerun/bridge.py | 4 ---- dimos/web/websocket_vis/websocket_vis_module.py | 2 +- 5 files changed, 1 insertion(+), 9 deletions(-) diff --git a/dimos/core/module.py b/dimos/core/module.py index 9618f99f29..80fdffe02f 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -107,7 +107,6 @@ class ModuleBase(Configurable[ModuleConfigT], Resource): def __init__(self, config_args: dict[str, Any]): super().__init__(**config_args) - print("MODULE", self, config_args, self.config) self._module_closed_lock = threading.Lock() self._loop, self._loop_thread = get_loop() self._disposables = CompositeDisposable() diff --git a/dimos/core/worker.py b/dimos/core/worker.py index fcc64192f5..aae4fbdf6f 100644 --- a/dimos/core/worker.py +++ b/dimos/core/worker.py @@ -214,7 +214,6 @@ def deploy_module( "module_class": module_class, "kwargs": kwargs, } - print(module_class, kwargs) with self._lock: self._conn.send(request) response = self._conn.recv() @@ -378,7 +377,6 @@ def _worker_loop(conn: Connection, instances: dict[int, Any], worker_id: int) -> except Exception as e: response["error"] = f"{e.__class__.__name__}: {e}\n{traceback.format_exc()}" - raise try: conn.send(response) diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index f160ac841c..dae497d689 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -204,7 +204,6 @@ def run( setup_exception_handler() cli_config_overrides: dict[str, Any] = ctx.obj - # global_config.update(**cli_config_overrides) # Clean stale registry entries stale = cleanup_stale() diff --git a/dimos/visualization/rerun/bridge.py b/dimos/visualization/rerun/bridge.py index abea948eec..6729f143cd 100644 --- a/dimos/visualization/rerun/bridge.py +++ b/dimos/visualization/rerun/bridge.py @@ -200,10 +200,6 @@ class RerunBridgeModule(Module[Config]): default_config = Config - def __init__(self, **kwargs): - super().__init__(**kwargs) - print("BRIDEG" * 10, self.config.memory_limit) - @lru_cache(maxsize=256) def _visual_override_for_entity_path( self, entity_path: str diff --git a/dimos/web/websocket_vis/websocket_vis_module.py b/dimos/web/websocket_vis/websocket_vis_module.py index 689828ddbf..7a5c9587e1 100644 --- a/dimos/web/websocket_vis/websocket_vis_module.py +++ b/dimos/web/websocket_vis/websocket_vis_module.py @@ -64,7 +64,7 @@ class WebsocketConfig(ModuleConfig): - port: int + port: int = 7779 class WebsocketVisModule(Module[WebsocketConfig]): From b313340c9ea8494b19f83d5bcaf0447fda3b1d3a Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Fri, 13 Mar 2026 11:49:33 +0000 Subject: [PATCH 3/4] Update dimos/core/worker.py --- dimos/core/worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dimos/core/worker.py b/dimos/core/worker.py index aae4fbdf6f..dca561f16c 100644 --- a/dimos/core/worker.py +++ b/dimos/core/worker.py @@ -343,7 +343,6 @@ def _worker_loop(conn: Connection, instances: dict[int, Any], worker_id: int) -> module_class = request["module_class"] kwargs = request["kwargs"] module_id = request["module_id"] - print("DEPLOY", module_class, kwargs) instance = module_class(**kwargs) instances[module_id] = instance response["result"] = module_id From 73b434a1c1f293b05bdbf5acbfcc0f918ace0e2b Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Fri, 13 Mar 2026 15:24:02 +0000 Subject: [PATCH 4/4] Typing fixes --- dimos/core/blueprints.py | 13 +++++++------ dimos/core/module.py | 2 +- dimos/core/module_coordinator.py | 3 ++- dimos/core/worker_manager.py | 4 ++-- dimos/robot/cli/dimos.py | 29 ++++++++++++++++++----------- 5 files changed, 30 insertions(+), 21 deletions(-) diff --git a/dimos/core/blueprints.py b/dimos/core/blueprints.py index 1559c55de3..31f0390b2f 100644 --- a/dimos/core/blueprints.py +++ b/dimos/core/blueprints.py @@ -14,7 +14,7 @@ from abc import ABC from collections import defaultdict -from collections.abc import Callable, Mapping +from collections.abc import Callable, Mapping, MutableMapping from dataclasses import dataclass, field, replace from functools import cached_property, reduce import operator @@ -132,10 +132,10 @@ def create(cls, module: type[ModuleBase], **kwargs: Any) -> "Blueprint": def disabled_modules(self, *modules: type[ModuleBase]) -> "Blueprint": return replace(self, disabled_modules_tuple=self.disabled_modules_tuple + modules) - def config(self) -> BaseModel: + def config(self) -> type[BaseModel]: configs = {b.module.name: (b.module.default_config | None, None) for b in self.blueprints} configs["g"] = (GlobalConfig | None, None) - return create_model("BlueprintConfig", __config__={"extra": "forbid"}, **configs) + return create_model("BlueprintConfig", __config__={"extra": "forbid"}, **configs) # type: ignore[call-overload,no-any-return] def transports(self, transports: dict[tuple[str, type], Any]) -> "Blueprint": return replace(self, transport_map=MappingProxyType({**self.transport_map, **transports})) @@ -284,7 +284,7 @@ def _deploy_all_modules( self, module_coordinator: ModuleCoordinator, global_config: GlobalConfig, - blueprint_args: dict[str, dict[str, Any]], + blueprint_args: Mapping[str, Mapping[str, Any]], ) -> None: module_specs: list[ModuleSpec] = [] for blueprint in self._active_blueprints: @@ -482,10 +482,11 @@ def _connect_rpc_methods(self, module_coordinator: ModuleCoordinator) -> None: def build( self, - blueprint_args: Mapping[str, Any] | None = None, + blueprint_args: MutableMapping[str, Any] | None = None, ) -> ModuleCoordinator: logger.info("Building the blueprint") global_config.update(**dict(self.global_config_overrides)) + blueprint_args = blueprint_args or {} if "g" in blueprint_args: global_config.update(**blueprint_args.pop("g")) @@ -498,7 +499,7 @@ def build( module_coordinator.start() # all module constructors are called here (each of them setup their own) - self._deploy_all_modules(module_coordinator, global_config, blueprint_args or {}) + self._deploy_all_modules(module_coordinator, global_config, blueprint_args) self._connect_streams(module_coordinator) self._connect_rpc_methods(module_coordinator) self._connect_module_refs(module_coordinator) diff --git a/dimos/core/module.py b/dimos/core/module.py index 80fdffe02f..b99a47a757 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -120,7 +120,7 @@ def __init__(self, config_args: dict[str, Any]): @classproperty def name(self) -> str: """Name for this module to be used for blueprint configs.""" - return self.__name__.lower() + return self.__name__.lower() # type: ignore[attr-defined,no-any-return] @property def frame_id(self) -> str: diff --git a/dimos/core/module_coordinator.py b/dimos/core/module_coordinator.py index efed155cf0..7a4f10cee2 100644 --- a/dimos/core/module_coordinator.py +++ b/dimos/core/module_coordinator.py @@ -14,6 +14,7 @@ from __future__ import annotations +from collections.abc import Mapping from concurrent.futures import ThreadPoolExecutor import threading from typing import TYPE_CHECKING, Any @@ -129,7 +130,7 @@ def deploy( return module # type: ignore[return-value] def deploy_parallel( - self, module_specs: list[ModuleSpec], blueprint_args: dict[str, dict[str, Any]] + self, module_specs: list[ModuleSpec], blueprint_args: Mapping[str, Mapping[str, Any]] ) -> list[ModuleProxy]: if not self._client: raise ValueError("Not started") diff --git a/dimos/core/worker_manager.py b/dimos/core/worker_manager.py index ff7121934f..96b9e227f8 100644 --- a/dimos/core/worker_manager.py +++ b/dimos/core/worker_manager.py @@ -14,7 +14,7 @@ from __future__ import annotations -from collections.abc import Iterable +from collections.abc import Iterable, Mapping from concurrent.futures import ThreadPoolExecutor from typing import Any @@ -64,7 +64,7 @@ def deploy( def deploy_parallel( self, module_specs: Iterable[ModuleSpec], - blueprint_args: dict[str, dict[str, Any]], + blueprint_args: Mapping[str, Mapping[str, Any]], ) -> list[RPCClient]: if self._closed: raise RuntimeError("WorkerManager is closed") diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index dae497d689..ea336b98f9 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -14,6 +14,7 @@ from __future__ import annotations +from collections.abc import Iterable from datetime import datetime, timezone import inspect import json @@ -32,13 +33,14 @@ import typer from dimos.agents.mcp.mcp_adapter import McpAdapter, McpError -from dimos.core.blueprints import Blueprint +from dimos.core.blueprints import Blueprint, _BlueprintAtom from dimos.core.global_config import GlobalConfig, global_config from dimos.core.run_registry import get_most_recent, is_pid_alive, stop_entry from dimos.utils.logging_config import setup_logger try: - from gi.repository import GLib + # Not a dependency, just the best way to get config path if available. + from gi.repository import GLib # type: ignore[import-untyped] except ImportError: CONFIG_DIR = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) else: @@ -120,7 +122,13 @@ def callback(**kwargs) -> None: # type: ignore[no-untyped-def] main.callback()(create_dynamic_callback()) # type: ignore[no-untyped-call] -def arghelp(config: BaseModel, blueprint: Blueprint, indent: str = " ", module: str = "") -> str: +def arghelp( + config: type[BaseModel], + blueprint: Blueprint, + indent: str = " ", + module: str = "", + _atom: _BlueprintAtom | None = None, +) -> str: output = "" for k, info in config.model_fields.items(): if k == "g": @@ -130,22 +138,23 @@ def arghelp(config: BaseModel, blueprint: Blueprint, indent: str = " ", modul # Can't be specified on CLI continue - if issubclass(t, BaseModel): + if t is not None and issubclass(t, BaseModel): output += f"{indent}{module}{k}:\n" # Find blueprint atom bp = next(bp for bp in blueprint.blueprints if bp.module.name == k) - output += arghelp(t, bp, indent=indent + " ", module=module + k + ".") + output += arghelp(t, blueprint, indent=indent + " ", module=module + k + ".", _atom=bp) else: + assert _atom is not None # Use __name__ to avoid "" style output on basic types. display_type = t.__name__ if isinstance(t, type) else t - required = "[Required] " if info.is_required() and k not in blueprint.kwargs else "" - d = blueprint.kwargs.get(k, info.default) + required = "[Required] " if info.is_required() and k not in _atom.kwargs else "" + d = _atom.kwargs.get(k, info.default) default = f" (default: {d})" if d is not PydanticUndefined else "" output += f"{indent}* {required}{module}{k}: {display_type}{default}\n" return output -def load_config_args(config: BaseModel, args: Iterable[str], path: Path) -> dict[str, Any]: +def load_config_args(config: type[BaseModel], args: Iterable[str], path: Path) -> dict[str, Any]: try: kwargs = json.loads(path.read_text()) except (OSError, json.JSONDecodeError): @@ -172,7 +181,7 @@ def load_config_args(config: BaseModel, args: Iterable[str], path: Path) -> dict # This will help catch misspellings and similar mistakes. config(**kwargs) - return kwargs + return kwargs # type: ignore[no-any-return] @main.command() @@ -546,8 +555,6 @@ def restart( typer.echo(f"Error: failed to restart — {exc}", err=True) raise typer.Exit(1) - sub_command(blueprint_args) - @main.command() def show_config(ctx: typer.Context) -> None: