From 42abc72492139bfda62a315727c04730fa2ebda3 Mon Sep 17 00:00:00 2001 From: webcoderz <19884161+webcoderz@users.noreply.github.com> Date: Tue, 31 Dec 2024 16:40:35 -0500 Subject: [PATCH 1/5] postgres memory provider WIP --- examples/pg-memory.py | 46 +++++ src/controlflow/memory/memory.py | 13 ++ src/controlflow/memory/providers/postgres.py | 198 +++++++++++++++++++ 3 files changed, 257 insertions(+) create mode 100644 examples/pg-memory.py create mode 100644 src/controlflow/memory/providers/postgres.py diff --git a/examples/pg-memory.py b/examples/pg-memory.py new file mode 100644 index 00000000..d1d1ea07 --- /dev/null +++ b/examples/pg-memory.py @@ -0,0 +1,46 @@ +from controlflow.memory.memory import Memory +from controlflow.memory.providers.postgres import PostgresMemory +import controlflow as cf + + +provider = PostgresMemory( + database_url="postgresql://postgres:postgres@localhost:5432/your_database", + table_name="vector_db", +) +# Create a memory module for user preferences +user_preferences = cf.Memory( + key="user_preferences", + instructions="Store and retrieve user preferences.", + provider=provider +) + +# Create an agent with access to the memory +agent = cf.Agent(memories=[user_preferences]) + + +# Create a flow to ask for the user's favorite color +@cf.flow +def remember_color(): + return cf.run( + "Ask the user for their favorite color and store it in memory", + agents=[agent], + interactive=True, + ) + + +# Create a flow to recall the user's favorite color +@cf.flow +def recall_color(): + return cf.run( + "What is the user's favorite color?", + agents=[agent], + ) + + +if __name__ == "__main__": + print("First flow:") + remember_color() + + print("\nSecond flow:") + result = recall_color() + print(result) diff --git a/src/controlflow/memory/memory.py b/src/controlflow/memory/memory.py index 2e521081..84849c8d 100644 --- a/src/controlflow/memory/memory.py +++ b/src/controlflow/memory/memory.py @@ -166,4 +166,17 @@ def get_memory_provider(provider: str) -> MemoryProvider: return lance_providers.LanceMemory() + + # --- Postgres --- + elif provider.startswith("postgres"): + try: + import sqlalchemy + except ImportError: + raise ImportError( + "To use Postgres as a memory provider, please install the `sqlalchemy` package." + ) + + import controlflow.memory.providers.postgres as postgres_providers + + return postgres_providers.PostgresMemory() raise ValueError(f'Memory provider "{provider}" could not be loaded from a string.') diff --git a/src/controlflow/memory/providers/postgres.py b/src/controlflow/memory/providers/postgres.py new file mode 100644 index 00000000..b9f1acc3 --- /dev/null +++ b/src/controlflow/memory/providers/postgres.py @@ -0,0 +1,198 @@ +import uuid +from pathlib import Path +from typing import Callable, Dict, Optional + +import sqlalchemy +from sqlalchemy import Column, String, select, text +from sqlalchemy.orm import declarative_base, Session, sessionmaker +from sqlalchemy_utils import database_exists, create_database +from sqlalchemy.exc import ProgrammingError +from pgvector.sqlalchemy import Vector +from pydantic import Field + +import controlflow +from controlflow.memory.memory import MemoryProvider + +try: + # For embeddings, we can reuse the same LanceDB approach to retrieve an embedding function: + from lancedb.embeddings import get_registry +except ImportError: + raise ImportError( + "To use an embedding function similar to LanceDB's default, " + "please install lancedb with: pip install lancedb" + ) + +# SQLAlchemy base class for declarative models +Base = declarative_base() + +class SQLMemoryTable(Base): + """ + A simple declarative model that represents a memory record. + + We’ll dynamically set the __tablename__ at runtime. + """ + __abstract__ = True + id = Column(String, primary_key=True) + text = Column(String) + # Use pgvector for storing embeddings in a Postgres Vector column + vector = Column(Vector(dim=1536)) # Adjust dimension to match your embedding model + + +class PostgresMemory(MemoryProvider): + """ + A ControlFlow MemoryProvider that stores text + embeddings in PostgreSQL + using SQLAlchemy and pg_vector. Each Memory module gets its own table. + """ + + # Default database URL. You can point this to your actual Postgres instance. + # Requires the pgvector extension installed and the sqlalchemy-pgvector package. + database_url: str = Field( + default="postgresql://user:password@localhost:5432/your_database", + description="SQLAlchemy-compatible database URL to a Postgres instance with pgvector.", + ) + table_name: str = Field( + "memory_{key}", + description=""" + Name of the table to store this memory partition. "{key}" will be replaced + by the memory’s key attribute. + """, + ) + embedding_fn: Callable = Field( + default_factory=lambda: get_registry() + .get("openai") + .create(name="text-embedding-ada-002"), + description=( + "The function used to generate vector embeddings for text. " + "Defaults to an OpenAI text-embedding-ada-002 model using LanceDB's registry." + ), + ) + + + + # Internal: keep a cached Session maker + _SessionLocal: Optional[sessionmaker] = None + + # This dict will map "table_name" -> "model class" + _table_class_cache: Dict[str, Base] = {} + + def configure(self, memory_key: str) -> None: + """ + Configure a SQLAlchemy session and ensure the table for this + memory partition is created if it does not already exist. + """ + engine = sqlalchemy.create_engine(self.database_url) + + + # 2) If DB doesn't exist, create it! + if not database_exists(engine.url): + create_database(engine.url) + + with engine.connect() as conn: + conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + conn.commit() + + self._SessionLocal = sessionmaker(bind=engine) + + # Dynamically create a specialized table model for this memory_key + table_name = self.table_name.format(key=memory_key) + + # 1) Check if table already in metadata + if table_name not in Base.metadata.tables: + # 2) Create the dynamic class + table + memory_model = type( + f"SQLMemoryTable_{memory_key}", + (SQLMemoryTable,), + {"__tablename__": table_name}, + ) + + try: + Base.metadata.create_all(engine, tables=[memory_model.__table__]) + # Store it in the cache + self._table_class_cache[table_name] = memory_model + except ProgrammingError as e: + raise RuntimeError(f"Failed to create table {table_name}: {e}") + + + def _get_session(self) -> Session: + if not self._SessionLocal: + raise RuntimeError( + "Session is not initialized. Make sure to call configure() first." + ) + return self._SessionLocal() + + def _get_table(self, memory_key: str) -> Base: + """ + Return a dynamically generated declarative model class + mapped to the memory_{key} table. Each memory partition + has a separate table. + """ + table_name = self.table_name.format(key=memory_key) + + # Return the cached class if already built + if table_name in self._table_class_cache: + return self._table_class_cache[table_name] + + # If for some reason it's not there, create it now (or raise error): + memory_model = type( + f"SQLMemoryTable_{memory_key}", + (SQLMemoryTable,), + {"__tablename__": table_name}, + ) + self._table_class_cache[table_name] = memory_model + return memory_model + + def add(self, memory_key: str, content: str) -> str: + """ + Insert a new memory record into the Postgres table, + generating an embedding and storing it in a vector column. + Returns the memory’s ID (uuid). + """ + memory_id = str(uuid.uuid4()) + model_cls = self._get_table(memory_key) + + # Generate an embedding for the content + embedding = self.embedding_fn(content) + + with self._get_session() as session: + record = model_cls(id=memory_id, text=content, vector=embedding) + session.add(record) + session.commit() + + return memory_id + + def delete(self, memory_key: str, memory_id: str) -> None: + """ + Delete a memory record by its UUID. + """ + model_cls = self._get_table(memory_key) + + with self._get_session() as session: + session.query(model_cls).filter(model_cls.id == memory_id).delete() + session.commit() + + def search(self, memory_key: str, query: str, n: int = 20) -> Dict[str, str]: + """ + Uses pgvector’s approximate nearest neighbor search with the `<->` operator to find + the top N matching records for the embedded query. Returns a dict of {id: text}. + """ + model_cls = self._get_table(memory_key) + # Generate embedding for the query + query_embedding = self.embedding_fn(query) + + # Postgres syntax to compute distance: vector <-> query_embedding + # We'll parameterize the embedding as a literal array. + with self._get_session() as session: + # The raw SQL approach, because SQLAlchemy doesn’t have built-in syntax + # for `<->` directly yet. We can still pass parameters safely: + stmt = text(f""" + SELECT id, text + FROM {model_cls.__tablename__} + ORDER BY vector <-> :query_vec + LIMIT :limit + """) + results = session.execute( + stmt, + {"query_vec": query_embedding, "limit": n}, + ).all() + + return {row.id: row.text for row in results} \ No newline at end of file From e15d8d679fa6216cf2479c93f7072194d8c7d11a Mon Sep 17 00:00:00 2001 From: webcoderz <19884161+webcoderz@users.noreply.github.com> Date: Tue, 31 Dec 2024 18:30:53 -0500 Subject: [PATCH 2/5] pg memory module --- examples/pg-memory.py | 6 +- src/controlflow/memory/providers/postgres.py | 68 ++++++++++++-------- 2 files changed, 45 insertions(+), 29 deletions(-) diff --git a/examples/pg-memory.py b/examples/pg-memory.py index d1d1ea07..01d2248e 100644 --- a/examples/pg-memory.py +++ b/examples/pg-memory.py @@ -5,6 +5,8 @@ provider = PostgresMemory( database_url="postgresql://postgres:postgres@localhost:5432/your_database", + #embedding_dimension=1536, + # embedding_fn=OpenAIEmbeddings(), table_name="vector_db", ) # Create a memory module for user preferences @@ -22,7 +24,7 @@ @cf.flow def remember_color(): return cf.run( - "Ask the user for their favorite color and store it in memory", + "Ask the user for their favorite animal and store it in memory", agents=[agent], interactive=True, ) @@ -32,7 +34,7 @@ def remember_color(): @cf.flow def recall_color(): return cf.run( - "What is the user's favorite color?", + "What is the user's favorite animal?", agents=[agent], ) diff --git a/src/controlflow/memory/providers/postgres.py b/src/controlflow/memory/providers/postgres.py index b9f1acc3..4f1fa526 100644 --- a/src/controlflow/memory/providers/postgres.py +++ b/src/controlflow/memory/providers/postgres.py @@ -9,13 +9,15 @@ from sqlalchemy.exc import ProgrammingError from pgvector.sqlalchemy import Vector from pydantic import Field +from sqlalchemy.dialects.postgresql import ARRAY +from sqlalchemy import select import controlflow from controlflow.memory.memory import MemoryProvider try: - # For embeddings, we can reuse the same LanceDB approach to retrieve an embedding function: - from lancedb.embeddings import get_registry + # For embeddings, we can use langchain_openai or any other library: + from langchain_openai import OpenAIEmbeddings except ImportError: raise ImportError( "To use an embedding function similar to LanceDB's default, " @@ -35,7 +37,7 @@ class SQLMemoryTable(Base): id = Column(String, primary_key=True) text = Column(String) # Use pgvector for storing embeddings in a Postgres Vector column - vector = Column(Vector(dim=1536)) # Adjust dimension to match your embedding model + #vector = Column(Vector(dim=1536)) # Adjust dimension to match your embedding model class PostgresMemory(MemoryProvider): @@ -57,14 +59,17 @@ class PostgresMemory(MemoryProvider): by the memory’s key attribute. """, ) + + embedding_dimension: int = Field( + default=1536, + description="Dimension of the embedding vectors. Match your model's output." + ) + embedding_fn: Callable = Field( - default_factory=lambda: get_registry() - .get("openai") - .create(name="text-embedding-ada-002"), - description=( - "The function used to generate vector embeddings for text. " - "Defaults to an OpenAI text-embedding-ada-002 model using LanceDB's registry." + default_factory=lambda: OpenAIEmbeddings( + model="text-embedding-ada-002", ), + description="A function that turns a string into a vector." ) @@ -75,6 +80,9 @@ class PostgresMemory(MemoryProvider): # This dict will map "table_name" -> "model class" _table_class_cache: Dict[str, Base] = {} + + + def configure(self, memory_key: str) -> None: """ Configure a SQLAlchemy session and ensure the table for this @@ -102,7 +110,11 @@ def configure(self, memory_key: str) -> None: memory_model = type( f"SQLMemoryTable_{memory_key}", (SQLMemoryTable,), - {"__tablename__": table_name}, + { + "__tablename__": table_name, + "vector": Column(Vector(dim=self.embedding_dimension)), + }, + ) try: @@ -119,6 +131,9 @@ def _get_session(self) -> Session: "Session is not initialized. Make sure to call configure() first." ) return self._SessionLocal() + + def _vector_param(self, embedding: list[float]) -> str: + return f"[{', '.join(str(x) for x in embedding)}]" def _get_table(self, memory_key: str) -> Base: """ @@ -136,7 +151,10 @@ def _get_table(self, memory_key: str) -> Base: memory_model = type( f"SQLMemoryTable_{memory_key}", (SQLMemoryTable,), - {"__tablename__": table_name}, + { + "__tablename__": table_name, + "vector": Column(Vector(dim=self.embedding_dimension)), + }, ) self._table_class_cache[table_name] = memory_model return memory_model @@ -151,7 +169,7 @@ def add(self, memory_key: str, content: str) -> str: model_cls = self._get_table(memory_key) # Generate an embedding for the content - embedding = self.embedding_fn(content) + embedding = self.embedding_fn.embed_query(content) with self._get_session() as session: record = model_cls(id=memory_id, text=content, vector=embedding) @@ -177,22 +195,18 @@ def search(self, memory_key: str, query: str, n: int = 20) -> Dict[str, str]: """ model_cls = self._get_table(memory_key) # Generate embedding for the query - query_embedding = self.embedding_fn(query) + query_embedding = self.embedding_fn.embed_query(query) + embedding_col = model_cls.vector + - # Postgres syntax to compute distance: vector <-> query_embedding - # We'll parameterize the embedding as a literal array. with self._get_session() as session: - # The raw SQL approach, because SQLAlchemy doesn’t have built-in syntax - # for `<->` directly yet. We can still pass parameters safely: - stmt = text(f""" - SELECT id, text - FROM {model_cls.__tablename__} - ORDER BY vector <-> :query_vec - LIMIT :limit - """) + results = session.execute( - stmt, - {"query_vec": query_embedding, "limit": n}, - ).all() + select(model_cls.id, model_cls.text) + .order_by(embedding_col.l2_distance(query_embedding)) + .limit(n) + ).all() + + return {row.id: row.text for row in results} + - return {row.id: row.text for row in results} \ No newline at end of file From a63d8c9c0a9533079f1fa094eb13950838f4a975 Mon Sep 17 00:00:00 2001 From: webcoderz <19884161+webcoderz@users.noreply.github.com> Date: Tue, 31 Dec 2024 18:31:48 -0500 Subject: [PATCH 3/5] Update pg-memory.py --- examples/pg-memory.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/pg-memory.py b/examples/pg-memory.py index 01d2248e..57d50bf6 100644 --- a/examples/pg-memory.py +++ b/examples/pg-memory.py @@ -24,7 +24,7 @@ @cf.flow def remember_color(): return cf.run( - "Ask the user for their favorite animal and store it in memory", + "Ask the user for their favorite color and store it in memory", agents=[agent], interactive=True, ) @@ -34,7 +34,7 @@ def remember_color(): @cf.flow def recall_color(): return cf.run( - "What is the user's favorite animal?", + "What is the user's favorite color?", agents=[agent], ) From 78b395e5e550db8a6b30e5eb08d58a51f15c4b8a Mon Sep 17 00:00:00 2001 From: webcoderz <19884161+webcoderz@users.noreply.github.com> Date: Tue, 31 Dec 2024 18:44:52 -0500 Subject: [PATCH 4/5] removed unnecessary import and removed my str join function that became unnecessary --- src/controlflow/memory/providers/postgres.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/controlflow/memory/providers/postgres.py b/src/controlflow/memory/providers/postgres.py index 4f1fa526..d1775a23 100644 --- a/src/controlflow/memory/providers/postgres.py +++ b/src/controlflow/memory/providers/postgres.py @@ -1,5 +1,4 @@ import uuid -from pathlib import Path from typing import Callable, Dict, Optional import sqlalchemy @@ -81,8 +80,6 @@ class PostgresMemory(MemoryProvider): _table_class_cache: Dict[str, Base] = {} - - def configure(self, memory_key: str) -> None: """ Configure a SQLAlchemy session and ensure the table for this @@ -132,8 +129,6 @@ def _get_session(self) -> Session: ) return self._SessionLocal() - def _vector_param(self, embedding: list[float]) -> str: - return f"[{', '.join(str(x) for x in embedding)}]" def _get_table(self, memory_key: str) -> Base: """ From 2793154ec01bb4a174f09567df72362df7c4b300 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Wed, 1 Jan 2025 08:11:48 -0500 Subject: [PATCH 5/5] Fix formatting --- examples/pg-memory.py | 11 +++--- src/controlflow/memory/memory.py | 1 - src/controlflow/memory/providers/postgres.py | 38 ++++++++------------ 3 files changed, 19 insertions(+), 31 deletions(-) diff --git a/examples/pg-memory.py b/examples/pg-memory.py index 57d50bf6..6087fb06 100644 --- a/examples/pg-memory.py +++ b/examples/pg-memory.py @@ -1,19 +1,18 @@ +import controlflow as cf from controlflow.memory.memory import Memory from controlflow.memory.providers.postgres import PostgresMemory -import controlflow as cf - provider = PostgresMemory( database_url="postgresql://postgres:postgres@localhost:5432/your_database", - #embedding_dimension=1536, - # embedding_fn=OpenAIEmbeddings(), + # embedding_dimension=1536, + # embedding_fn=OpenAIEmbeddings(), table_name="vector_db", ) # Create a memory module for user preferences user_preferences = cf.Memory( - key="user_preferences", + key="user_preferences", instructions="Store and retrieve user preferences.", - provider=provider + provider=provider, ) # Create an agent with access to the memory diff --git a/src/controlflow/memory/memory.py b/src/controlflow/memory/memory.py index 84849c8d..1e0dd840 100644 --- a/src/controlflow/memory/memory.py +++ b/src/controlflow/memory/memory.py @@ -166,7 +166,6 @@ def get_memory_provider(provider: str) -> MemoryProvider: return lance_providers.LanceMemory() - # --- Postgres --- elif provider.startswith("postgres"): try: diff --git a/src/controlflow/memory/providers/postgres.py b/src/controlflow/memory/providers/postgres.py index d1775a23..6887722a 100644 --- a/src/controlflow/memory/providers/postgres.py +++ b/src/controlflow/memory/providers/postgres.py @@ -2,14 +2,13 @@ from typing import Callable, Dict, Optional import sqlalchemy -from sqlalchemy import Column, String, select, text -from sqlalchemy.orm import declarative_base, Session, sessionmaker -from sqlalchemy_utils import database_exists, create_database -from sqlalchemy.exc import ProgrammingError from pgvector.sqlalchemy import Vector from pydantic import Field +from sqlalchemy import Column, String, select, text from sqlalchemy.dialects.postgresql import ARRAY -from sqlalchemy import select +from sqlalchemy.exc import ProgrammingError +from sqlalchemy.orm import Session, declarative_base, sessionmaker +from sqlalchemy_utils import create_database, database_exists import controlflow from controlflow.memory.memory import MemoryProvider @@ -26,17 +25,19 @@ # SQLAlchemy base class for declarative models Base = declarative_base() + class SQLMemoryTable(Base): """ A simple declarative model that represents a memory record. - + We’ll dynamically set the __tablename__ at runtime. """ + __abstract__ = True id = Column(String, primary_key=True) text = Column(String) # Use pgvector for storing embeddings in a Postgres Vector column - #vector = Column(Vector(dim=1536)) # Adjust dimension to match your embedding model + # vector = Column(Vector(dim=1536)) # Adjust dimension to match your embedding model class PostgresMemory(MemoryProvider): @@ -61,25 +62,22 @@ class PostgresMemory(MemoryProvider): embedding_dimension: int = Field( default=1536, - description="Dimension of the embedding vectors. Match your model's output." + description="Dimension of the embedding vectors. Match your model's output.", ) embedding_fn: Callable = Field( default_factory=lambda: OpenAIEmbeddings( model="text-embedding-ada-002", ), - description="A function that turns a string into a vector." + description="A function that turns a string into a vector.", ) - - # Internal: keep a cached Session maker _SessionLocal: Optional[sessionmaker] = None # This dict will map "table_name" -> "model class" _table_class_cache: Dict[str, Base] = {} - def configure(self, memory_key: str) -> None: """ Configure a SQLAlchemy session and ensure the table for this @@ -87,7 +85,6 @@ def configure(self, memory_key: str) -> None: """ engine = sqlalchemy.create_engine(self.database_url) - # 2) If DB doesn't exist, create it! if not database_exists(engine.url): create_database(engine.url) @@ -111,7 +108,6 @@ def configure(self, memory_key: str) -> None: "__tablename__": table_name, "vector": Column(Vector(dim=self.embedding_dimension)), }, - ) try: @@ -121,14 +117,12 @@ def configure(self, memory_key: str) -> None: except ProgrammingError as e: raise RuntimeError(f"Failed to create table {table_name}: {e}") - def _get_session(self) -> Session: if not self._SessionLocal: raise RuntimeError( "Session is not initialized. Make sure to call configure() first." ) return self._SessionLocal() - def _get_table(self, memory_key: str) -> Base: """ @@ -193,15 +187,11 @@ def search(self, memory_key: str, query: str, n: int = 20) -> Dict[str, str]: query_embedding = self.embedding_fn.embed_query(query) embedding_col = model_cls.vector - with self._get_session() as session: - results = session.execute( - select(model_cls.id, model_cls.text) - .order_by(embedding_col.l2_distance(query_embedding)) - .limit(n) - ).all() + select(model_cls.id, model_cls.text) + .order_by(embedding_col.l2_distance(query_embedding)) + .limit(n) + ).all() return {row.id: row.text for row in results} - -