Skip to content

Feat/memory2#1536

Open
leshy wants to merge 120 commits intodevfrom
feat/memory2
Open

Feat/memory2#1536
leshy wants to merge 120 commits intodevfrom
feat/memory2

Conversation

@leshy
Copy link
Contributor

@leshy leshy commented Mar 12, 2026

Summary

New memory subsystem (memory2) — a lazy, pull-based stream engine for storing, querying, and transforming robot observations with full type safety.

Key pieces:

  • Generic Stream[T] / Observation[T] model with typed filters (TimeRange, NearFilter, spatial R*Tree index)
  • Pluggable backends: in-memory ListBackend and full SqliteBackend (JSONB metadata, vec0 vector search, R*Tree spatial index, cursor pagination)
  • Codec layer (Pickle, JPEG/TurboJPEG, LCM) with blob storage (file / SQLite)
  • Embedding & vector search with search_embedding(str | image)
  • Live reactive layer via LiveChannel with .observable() / .subscribe()
  • Transform streams (projection, chaining, bare generator functions as transforms)

Pluggable extension points: Store (observation backend), LiveChannel (reactive pub/sub), BlobStore (binary data), EmbeddingStore (vector search) — each swappable independently.

Documentation:

Breaking Changes

None — memory2 is a new parallel module; existing memory/ is untouched.

How to Test

cd dimos/memory2 && python -m pytest

Contributor License Agreement

  • I have read and approved the CLA.

leshy added 30 commits March 4, 2026 22:11
…e spatial index, lazy data loading

- Pose stored as 7 real columns (x/y/z + quaternion) instead of blob, enabling R*Tree spatial indexing
- Payload moved to separate {name}_payload table with lazy loading via _data_loader closure
- R*Tree virtual table created per stream for .near() bounding-box queries
- Added __iter__ to Stream for lazy iteration via fetch_pages
- Added embedding_stream() to Session ABC
- Updated _streams metadata with parent_stream and embedding_dim columns
- Codec module extracted (LcmCodec, PickleCodec, codec_for_type)
- Fixed broken memory_old.timeseries imports (memory.timeseries → memory_old.timeseries)
- Tests now use real Image data from TimedSensorReplay("unitree_go2_bigoffice/video")
- 32/32 tests passing, mypy clean
…dowTransformer, E2E test

- Add JpegCodec as default codec for Image types (2.76MB → 64KB per frame)
- Preserve frame_id in JPEG header; ts stored in meta table
- Add ingest() helper for bulk-loading (ts, payload) iterables into streams
- Add QualityWindowTransformer: best-frame-per-window (supports backfill + live)
- EmbeddingTransformer sets output_type=Embedding automatically
- Require payload_type when creating new streams (no silent PickleCodec fallback)
- TransformStream.store() accepts payload_type, propagated through materialize_transform
- E2E test: 5min video → sharpness filter → CLIP embed → text search
- Move test_sqlite.py next to sqlite.py, update Image comparisons for lossy codec
- Add sqlite-vec dependency
…rojection

- Add parent_id to Observation, append(), do_append(), and _META_COLS
- All transformers (PerItem, QualityWindow, Embedding) pass obs.id as parent_id
- SqliteEmbeddingBackend._row_to_obs() wires _source_data_loader via parent_id
- EmbeddingObservation.data now auto-projects to parent stream's payload (e.g. Image)
- No more timestamp-matching hacks to find source data from embedding results
- materialize_transform() now UPDATEs _streams.parent_stream so stream-level
  lineage is discoverable (prerequisite for .join())
- Fix mypy: narrow parent_table type in _source_loader closure
- Add plans/memory/tasks.md documenting all spec-vs-impl gaps
Adds LineageFilter that compiles to nested SQL subqueries walking the
parent_id chain. project_to(target) returns a chainable target Stream
using the same _with_filter mechanism as .after(), .near(), etc.

Also fixes _session propagation in search_embedding/search_text.
EmbeddingStream is a semantic index — search results should be source
observations (Images), not Embedding objects. search_embedding now
auto-projects via project_to when lineage exists, falling back to
EmbeddingStream for standalone streams without parent lineage.
- Add CaptionTransformer: wraps Captioner/VlModel, uses caption_batch()
  for backfill efficiency, auto-creates TextStream with FTS on .store()
- Fix Florence2 caption_batch() emitting <pad> tokens (skip_special_tokens)
- E2E script now uses transform pipeline for captioning search results
fetch() now returns ObservationSet instead of plain list, keeping you
in the Stream API. This enables fork-and-zip (one DB query, two uses)
and in-memory re-filtering without re-querying the database.

- Add matches(obs) to all filter dataclasses for in-Python evaluation
- Add ListBackend (in-memory StreamBackend) and ObservationSet class
- Filtered .appended reactive subscription via matches() infrastructure
- Update e2e export script to use fork-and-zip pattern
- 20 new tests (64 total, all passing)
EmbeddingStream now holds an optional model reference, so
search_embedding auto-dispatches: str → embed_text(), image → embed(),
Embedding/list[float] → use directly. The model is wired through
materialize_transform and also accepted via embedding_stream().
- Fix SpatialImage/SpatialEntry dataclass hierarchy in memory_old
- Fix import path in memory_old/test_embedding.py
- Add None guard for obs.ts in run_viz_demo.py
- Add payload_type/session kwargs to base Stream.store() signature
- Type-annotate embeddings as EmbeddingStream in run_e2e_export.py
- Add similarity scores, raw search mode, pose ingest, viz pipeline
- Normalize similarity scores relative to min/max (CLIP clusters in narrow band)
- Add distance_transform_edt spread so dots radiate outward, fading to 0
- Bump default search k to 200 for denser heatmaps
- Validate stream names and tag keys as SQL identifiers
- Allowlist order_by fields to {id, ts}
- Re-sort vector search results by distance rank after IN-clause fetch
- Make TagsFilter hashable (tuple of pairs instead of dict)
- Remove dead code in memory_old/embedding.py
- Add scipy-stubs, fix distance_transform_edt type annotations
- Add dimos/memory/rerun.py: to_rerun() sends stream data to Rerun
  with auto-derived entity paths and no wall-clock timeline contamination
- Fix Stream.fetch_pages() to respect limit_val (was always overridden
  by batch_size, making .limit() ineffective during iteration)
- Update viz.py: normalize similarities with 20% floor cutoff,
  sort timeline by timestamp, add log_top_images()
- Convert run_e2e_export.py to pytest with cached DB fixture
- Update plans/memory docs to match current implementation
…, fix mypy

- Rename to test_e2e_export.py (it's a pytest file, not a standalone script)
- Fix Generator return type and type: ignore for mypy
- Delete viz.py (replaced by rerun.py) and run_viz_demo.py
- Update docs/api.md to reference rerun.py instead of viz.py
…ad reduction

- Switch JpegCodec from cv2.imencode to TurboJPEG (2-5x faster encode/decode)
- Lower default JPEG quality from 90 to 50 for smaller storage footprint
- Downscale sharpness computation to 160px Laplacian variance (10-20x cheaper)
- Add MemoryModule with plain-Python sharpness windowing (no rx timer overhead)
- Limit OpenCV threads: 2 globally in worker entrypoint, 1 in MemoryModule
- Cap global rx ThreadPoolScheduler at 8 workers (was unbounded cpu_count)
- Refactor SqliteEmbeddingBackend/SqliteTextBackend to use _post_insert hook
- Encode payload before meta insert to prevent orphaned rows on codec error
- Add `dimos ps` CLI command and `dps` entrypoint for non-interactive process listing
- Add unitree-go2-memory blueprint
cv2.setNumThreads(2)
except ImportError:
pass

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think opencv stuff belongs in worker.py. Maybe add a bootstrap_worker function in a different file and call it here.

Copy link
Contributor Author

@leshy leshy Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah me niether,

this was actually consuming a bunch of resources so quickly prefixed dimos startup with this setting but obviously doesn't belong here.

should we have some "general lib config" file that's preloaded for all dimos runs? where should this be ran from?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just configuring this in a different file should be sufficient.

Comment on lines +118 to +119
Extends Resource (start/stop) but does NOT manage its dependencies'
lifecycle — the caller owns the session / connection.
Copy link
Contributor

@paul-nechifor paul-nechifor Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it. Why does it not manage its own dependencies? I see Sqlite has blank start/stop and file blobstore has mkdir in start (which is not needed because the parents are created when put()-ing files.

If it doesn't use start/stop why does it need to be a resource?

Edit: I see. Sqlite for example takes a connection. In that case BlobStore should not be a Resource.

Also, these types of objects are usually called repositories: https://www.geeksforgeeks.org/system-design/repository-design-pattern/

So maybe rename it to BlobRepository?

Copy link
Contributor Author

@leshy leshy Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sry that was a mistake in the comment

I expected normally that blobstores (and vectorstores) are fully fledged stores that manage their lifecycles (or have an option to, hence contextmanager stuff. since we might have some ultra-specialized systems for these things.

Sqliteblobstore + Sqlitestore is a special case in which sqlite store takes over blob/vector store lifecycle management (since they need to share a session) it then uses efficient JOINs to fetch data if .data for a stream is specified as non-lazy

this smart-coupling is a bit rough around the edges atm, this PR is mostly for us to agree on general approach and how we treat data. there will be a bunch of work and can split and organize IMO. we don't really have multiple databases yet to properly test coupling/decoupling yet etc, but this is the idea

reason for a decoupling between blob store and metadata stores is that actually we want medatata in relational database but likely will have a more appropriate storage system for actual large binary blobs

import sqlite3


class SqliteBlobStore(BlobStore):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this API be thread-safe? Looking at the code, it does look like it might be used from different threads at the same time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idea is mostly not, database session should be used in a single thread. and underlying database is responsible for efficient parallelization. (memory store is just a prototype for tests)

there are a few contrived ways to escape this single thread restriction and cause problems (via .observable() or .subscribe()) but I think you'd really have to look to cause an issue there

leshy added 12 commits March 13, 2026 15:15
Separate pure-definition files (protocols, ABCs, dataclasses) from
implementation files by moving them into a type/ subpackage:
- backend.py → type/backend.py
- type.py → type/observation.py
- filter.py → type/filter.py

Added type/__init__.py with re-exports for convenience imports.
Updated all 24 importing files across the module.
def delete(self, stream_name: str, key: int) -> None:
try:
self._conn.execute(f'DELETE FROM "{stream_name}_blob" WHERE id = ?', (key,))
except Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I'm not sure we should be hiding this. It should be at least a warning.

super().__init__(**kwargs)
self._name = name
self._observations: list[Observation[T]] = []
self._next_id = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use this instead of _next_id (it's thread safe).

from dimos.utils.sequential_ids import SequentialIds
_seq_ids = SequentialIds()
new_id = _seq_ids.next()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants