A complete Docker-based environment for testing RAG pipelines and LLM agents against data warehouse snapshots. Built with DuckDB, LightRAG, Marimo, and modern Python tooling.
This project provides:
- Isolated testing of RAG systems using DuckDB snapshots (no impact on production)
- Incremental indexing with LightRAG (vector + knowledge graph hybrid search)
- Interactive validation via Marimo notebooks
- Automated pipelines with Cronicle scheduling
- Text-to-SQL agents with graceful fallback to live databases
# 1. Clone/download the project
cd rag-testing-env
# 2. Set up environment variables
cp .env.example .env
# Edit .env and add your OPENAI_API_KEY
# 3. Start all services
docker-compose up -d
# 4. Access Marimo notebooks
open http://localhost:8080Production Postgres → DuckDB Snapshot → Documents (CSV/JSON)
↓
LightRAG Index
(Vector + Graph)
↓
LLM Agents
(Text-to-SQL, Q&A)
- Docker & Docker Compose
- OpenAI API key (or configure for local models)
- 4GB+ RAM recommended
- 10GB+ disk space
rag-testing-env/
├── docker-compose.yml # All services orchestration
├── Dockerfile # Python app container
├── requirements.txt # Python dependencies
├── .env.example # Environment template
├── data/
│ ├── warehouse/ # Postgres init SQL (sample data)
│ ├── snapshots/ # DuckDB snapshot files
│ ├── documents/ # Converted documents (CSV, JSON)
│ └── lightrag/ # LightRAG vector/graph storage
├── notebooks/ # Marimo interactive notebooks
│ ├── 01_snapshot.py # Create snapshot & convert
│ ├── 02_validate_kg.py # Index & validate KG
│ └── 03_test_agent.py # Test Text-to-SQL agent
├── pipelines/ # Cronicle automation jobs
│ ├── snapshot_job.py # Scheduled snapshot creation
│ ├── index_job.py # Scheduled indexing
│ └── validation_job.py # Scheduled validation
└── src/ # Core Python modules
├── snapshot.py # Postgres → DuckDB snapshotting
├── converter.py # DuckDB → documents conversion
├── indexer.py # LightRAG indexing (async)
├── validator.py # Knowledge graph validation
└── agent.py # Text-to-SQL LLM agent
The docker-compose stack includes:
| Service | Port | Purpose |
|---|---|---|
| Postgres | 5432 (internal) / 5438 (host) | Sample data warehouse |
| Redis | 6379 | Caching layer |
| Cronicle | 3012 | Job scheduler & orchestrator |
| App (Marimo) | 8080 | Interactive Python notebooks |
Create .env file:
cp .env.example .envEdit .env and add:
OPENAI_API_KEY=sk-your-openai-api-key-here# Start everything
docker-compose up -d
# Check status
docker-compose ps
# View logs
docker-compose logs -f app# Test Postgres connection
docker-compose exec postgres psql -U user -d warehouse -c "SELECT COUNT(*) FROM customers;"
# Test Python environment
docker-compose exec app python3 -c "import lightrag; print('LightRAG OK')"
# Test OpenAI connection (if configured)
docker-compose exec app python3 -c "import openai; print('OpenAI OK')"Open http://localhost:8080 and select 01_snapshot.py
The notebook automatically:
- Connects to Postgres warehouse
- Creates DuckDB snapshot at
/app/data/snapshots/warehouse.duckdb - Converts tables to CSV and JSON documents
- Saves to
/app/data/documents/
Expected Output:
✅ Snapshot Created!
Path: /app/data/snapshots/warehouse.duckdb
✅ Documents Created!
Total: 9 documents
What's happening:
- Tables:
customers,orders,products→ DuckDB - Each row becomes a JSON document for RAG
- CSV exports for human inspection
Open 02_validate_kg.py
The notebook automatically:
- Loads JSON documents from Step 1
- Indexes into LightRAG (async initialization)
- Creates vector embeddings + knowledge graph
- Validates entity extraction and relationships
Expected Output:
✅ Loaded 9 documents
✅ Indexed 9 documents
✅ Validation Complete!
- Entities: 45
- Relations: 38
- Orphans: 2
- Accuracy: 95.6%
What's happening:
- LightRAG extracts entities (customers, products, etc.)
- Builds relationship graph (customer → orders → products)
- Validates graph structure for completeness
Open 03_test_agent.py
The notebook:
- Initializes Text-to-SQL agent
- Provides text input for natural language queries
- Converts to SQL using LLM + RAG context
- Executes against DuckDB snapshot
- Falls back to live warehouse if snapshot fails
Example Usage:
Query: "How many customers do we have?"
✅ Success!
SQL:
SELECT COUNT(*) as customer_count FROM customers
Results:
| customer_count |
|---------------|
| 3 |
Try these queries:
- "What is the total revenue from completed orders?"
- "List all products in the Electronics category"
- "Which customer has the highest order amount?"
- "Show me pending orders with customer names"
Once validated manually, schedule automated jobs:
- Visit http://localhost:3012
- Login:
admin/admin - Create jobs:
Daily Snapshot:
- Schedule:
0 2 * * *(2 AM daily) - Command:
/opt/cronicle/plugins/pipelines/snapshot_job.py - Creates fresh snapshot every night
Hourly Indexing:
- Schedule:
0 * * * *(every hour) - Command:
/opt/cronicle/plugins/pipelines/index_job.py - Updates RAG index with new data
Post-Index Validation:
- Trigger: After indexing job completes
- Command:
/opt/cronicle/plugins/pipelines/validation_job.py - Validates knowledge graph quality
# Verify snapshot exists
docker-compose exec app ls -lh /app/data/snapshots/
# Inspect snapshot
docker-compose exec app python3 << EOF
import duckdb
conn = duckdb.connect('/app/data/snapshots/warehouse.duckdb', read_only=True)
print("Tables:", conn.execute("SHOW TABLES").fetchall())
print("Rows:", conn.execute("SELECT COUNT(*) FROM customers").fetchone())
conn.close()
EOF# List converted files
docker-compose exec app ls -lh /app/data/documents/
# Inspect JSON structure
docker-compose exec app head -20 /app/data/documents/customers.json# Check index files
docker-compose exec app ls -lh /app/data/lightrag/
# View graph structure (if available)
docker-compose exec app python3 -c "
from lightrag import LightRAG
rag = LightRAG(working_dir='/app/data/lightrag')
print('Index loaded successfully')
"Creates isolated DuckDB copies of Postgres tables:
- Connects to Postgres warehouse
- Iterates through all tables
- Copies data to DuckDB file
- Deletes old snapshot if exists
Why? Test RAG without impacting production database.
Transforms DuckDB tables into documents:
- Exports tables to CSV (human-readable)
- Creates JSON documents per row (for RAG)
- Includes metadata (table name, row ID)
Why? RAG systems need document format, not SQL tables.
Async LightRAG indexing:
- Uses OpenAI embeddings via
openai_embed - Uses GPT-4o-mini for completions via
gpt_4o_mini_complete - Initializes storage + pipeline status
- Supports incremental updates
Important: Uses async/await pattern with nest_asyncio for Marimo compatibility.
Knowledge graph quality checks:
- Entity count
- Relation count
- Orphan node detection (entities without connections)
- Extraction accuracy metric
Why? Ensures RAG is extracting meaningful information.
Text-to-SQL conversion:
- Takes natural language query
- Uses LLM to generate SQL
- Executes against DuckDB snapshot
- Falls back to live warehouse on failure
Why? Make data accessible via natural language.
Solution: Hard refresh browser
# Windows/Linux: Ctrl + Shift + R
# Mac: Cmd + Shift + RSolution: Delete old snapshot
docker-compose exec app rm -f /app/data/snapshots/warehouse.duckdbSolution: Ensure initialize_pipeline_status() is called in src/indexer.py
await rag.initialize_storages()
await initialize_pipeline_status() # Must be after initialize_storages()Solution: Already fixed with nest_asyncio in indexer. If still occurring:
# Rebuild container
docker-compose down
docker-compose build --no-cache app
docker-compose up -dSolution: Check API key
docker-compose exec app env | grep OPENAI_API_KEY
# Should show: OPENAI_API_KEY=sk-...
# If missing, add to .env and restart
docker-compose restart appSolution: Check Postgres is running
docker-compose ps postgres
docker-compose logs postgres
# Restart if needed
docker-compose restart postgresEdit connection string in notebooks:
# notebooks/01_snapshot.py
pg_conn = "postgresql://your_user:your_pass@your_host:5432/your_db"Modify src/indexer.py to use local embeddings:
from lightrag.llm import ollama_model_complete, ollama_embedding
rag = LightRAG(
working_dir=storage_dir,
embedding_func=ollama_embedding,
llm_model_func=ollama_model_complete,
)Extend src/converter.py to support more formats:
# Add PDF support
from pypdf import PdfReader
def convert_to_pdf(df, output_path):
# Your PDF conversion logic
passAdd business logic to src/validator.py:
def validate_business_rules(rag, rules):
# Check for required entities
# Validate relationship constraints
# Return compliance report
pass- Sample data instead of full snapshot:
# In src/snapshot.py
cur.execute(f"SELECT * FROM {table} LIMIT 10000")- Batch document processing:
# In src/indexer.py
for batch in chunk_documents(documents, batch_size=100):
await rag.ainsert(batch)- Parallel table conversion:
# In src/converter.py
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
executor.map(convert_table, tables)- Use smaller embedding models
- Reduce chunk size in LightRAG config
- Skip validation step in production runs
- Change default passwords:
# docker-compose.yml
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} # Use env var- Add authentication to Marimo:
# Dockerfile
CMD ["marimo", "edit", "--host", "0.0.0.0", "--port", "8080", "--auth"]- Restrict network access:
# docker-compose.yml
services:
app:
networks:
- internal
networks:
internal:
internal: true # No external access- Encrypt snapshots at rest:
# Use encrypted volumes
docker volume create --driver local \
--opt type=none \
--opt device=/encrypted/path \
--opt o=bind \
encrypted_snapshotsTrack snapshot schemas:
# In pipelines/snapshot_job.py
import hashlib
schema = get_database_schema()
schema_hash = hashlib.sha256(schema.encode()).hexdigest()
save_schema_version(schema_hash)Compare RAG configurations:
# Create two indexes with different settings
rag_v1 = LightRAG(working_dir="./v1", chunk_size=512)
rag_v2 = LightRAG(working_dir="./v2", chunk_size=1024)
# Compare results
results_v1 = rag_v1.query("test query")
results_v2 = rag_v2.query("test query")Separate storage per tenant:
# In src/indexer.py
def index_documents(documents, storage_dir, tenant_id):
tenant_dir = f"{storage_dir}/{tenant_id}"
rag = LightRAG(working_dir=tenant_dir)
# ... rest of indexingThis is a reference implementation. Key areas for enhancement:
- Additional connectors: MySQL, BigQuery, Snowflake support
- More document formats: Excel, PowerPoint, Confluence
- Advanced validation: Schema drift detection, data quality checks
- UI improvements: Web dashboard, real-time metrics
- Cost optimization: Token usage tracking, caching strategies
Q: Can I use this with production databases?
A: Yes, but use read replicas and schedule snapshots during low-traffic periods.
Q: How often should I refresh snapshots?
A: Depends on data velocity. Daily for slowly-changing data, hourly for dynamic datasets.
Q: What if my database is too large?
A: Implement sampling, partition by date ranges, or focus on specific tables.
Q: Can I use other LLM providers?
A: Yes, modify src/indexer.py and src/agent.py to use Anthropic, Cohere, or local models.
Q: How do I scale this horizontally?
A: Deploy multiple app containers with shared storage (NFS/S3) and load balancing.
Q: What about cost control?
A: Use local models for embeddings, cache LLM responses, implement rate limiting.
MIT
For issues and questions:
- Check Troubleshooting section above
- Review docker-compose logs:
docker-compose logs - Inspect notebook errors in browser
- Test components individually as shown in Monitoring section
- LightRAG: https://github.com/HKUDS/LightRAG
- Marimo: https://marimo.io
- DuckDB: https://duckdb.org
- Cronicle: https://github.com/jhuckaby/Cronicle