Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions dimos/control/test_trajectory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
from typing import TypedDict

import pytest

from dimos.core import LCMTransport
from dimos.core.global_config import GlobalConfig
from dimos.mapping.occupancy.path_resampling import smooth_resample_path
from dimos.mapping.pointclouds.occupancy import height_cost_occupancy
from dimos.mapping.voxels import VoxelGridMapper
from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped
from dimos.msgs.geometry_msgs.Vector3 import Vector3
from dimos.msgs.nav_msgs.OccupancyGrid import OccupancyGrid
from dimos.msgs.nav_msgs.Path import Path
from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2
from dimos.navigation.replanning_a_star.min_cost_astar import min_cost_astar
from dimos.navigation.replanning_a_star.navigation_map import NavigationMap
from dimos.utils.data import get_data
from dimos.utils.testing.replay import TimedSensorReplay


class Moment(TypedDict):
global_map: PointCloud2
costmap: OccupancyGrid
navigation_costmap: OccupancyGrid
path: Path


@pytest.fixture(scope="session")
def get_moment():
@functools.lru_cache(maxsize=1)
def moment_provider(seek=10.0) -> Moment:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seek parameter is unused

Suggested change
def moment_provider(seek=10.0) -> Moment:
def moment_provider() -> Moment:

data_dir = "unitree_go2_bigoffice"
get_data(data_dir)

lidar_data = TimedSensorReplay(f"{data_dir}/lidar")

voxels = VoxelGridMapper()

for frame in lidar_data:
voxels.add_frame(frame)

global_map = voxels.get_global_pointcloud2()
Comment on lines +47 to +57
Copy link
Contributor

@paul-nechifor paul-nechifor Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of reconstructing the point cloud, you can get it from this existing fixture (which you might have to move):

@pytest.fixture
def big_office() -> PointCloud:
    return read_pointcloud(get_data("big_office.ply"))

It was created from unitree_go2_bigoffice.

It's in dimos/mapping/pointclouds/test_occupancy.py

Copy link
Contributor Author

@leshy leshy Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup mustafa can rewrite if he needs to, just wanted to sketch for him getting a path, good to know, didn't know we store a full pcd

voxels.stop()

costmap = height_cost_occupancy(global_map)

cfg = GlobalConfig()
nav_map = NavigationMap(cfg)
nav_map.update(costmap)
navigation_costmap = nav_map.make_gradient_costmap()

start = Vector3(-12.5, 12.5)
goal = PoseStamped(position=Vector3(-3, -12))

path = min_cost_astar(navigation_costmap, goal.position, start)
assert path is not None, f"No path found from {start} to {goal}"
path = smooth_resample_path(path, goal, spacing=0.1)

return Moment(
global_map=global_map,
costmap=costmap,
navigation_costmap=navigation_costmap,
path=path,
)

return moment_provider


@pytest.fixture(scope="session")
def publish_moment():
def publisher(moment: Moment) -> None:
for key, value in moment.items():
t = LCMTransport(f"/{key}", type(value))
t.publish(value)
t.lcm.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use t.stop() instead of directly accessing t.lcm.stop() to maintain proper encapsulation

Suggested change
t.lcm.stop()
t.stop()

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


return publisher


def test_basic(get_moment, publish_moment):
"""Test that we can create a Moment and publish it."""
moment = get_moment()
print(moment)
assert "global_map" in moment
assert "costmap" in moment
assert "navigation_costmap" in moment
assert "path" in moment
assert len(moment["path"].poses) > 0
print(moment.get("path"))
publish_moment(moment)
17 changes: 0 additions & 17 deletions dimos/robot/unitree_webrtc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Compatibility package for legacy dimos.robot.unitree_webrtc imports."""

from importlib import import_module
import sys

_ALIAS_MODULES = {
"demo_error_on_name_conflicts": "dimos.robot.unitree.demo_error_on_name_conflicts",
"keyboard_teleop": "dimos.robot.unitree.keyboard_teleop",
"mujoco_connection": "dimos.robot.unitree.mujoco_connection",
"type": "dimos.robot.unitree.type",
"unitree_g1_skill_container": "dimos.robot.unitree.g1.skill_container",
"unitree_skill_container": "dimos.robot.unitree.unitree_skill_container",
}

for alias, target in _ALIAS_MODULES.items():
sys.modules[f"{__name__}.{alias}"] = import_module(target)
Loading