A production-ready, multi-agent AI-powered data cleaning pipeline with schema validation, profiling, and intelligent agent workflows.
- Multi-Format Support: CSV, JSON, Parquet, Excel, XML, Avro, ORC
- AI-Powered Cleaning: LLM-based structuring, normalization, and layout agents
- Schema Validation: JSON schema-based validation with regex patterns and dependencies
- Data Profiling: Comprehensive column profiling with statistics and distributions
- Multiple Cleaning Presets: quick_clean, standard_clean, deep_clean
- REST API: FastAPI service for programmatic access
- CLI Interface: Command-line tools for batch processing
- Pipeline Orchestration: Checkpoint-based pipeline with retry logic
data-refinery/
├── agents/ # AI agents for intelligent processing
│ ├── structuring_agent.py
│ ├── normalization_agent.py
│ ├── layout_agent.py
│ └── human_agent.py
├── cleaner.py # Main cleaning pipeline
├── profiler.py # Data profiling utilities
├── validation.py # Schema validation
├── orchestrator.py # Multi-agent pipeline orchestration
├── api.py # FastAPI REST service
├── csv_cleaner.py # CSV-specific cleaning
├── config.py # Configuration settings
├── cleaning_config.py # Configurable cleaning rules
└── examples/ # Usage examples
# Clone the repository
cd projects/data-refinery
# Create virtual environment
python -m venv venv
source venv/bin/activate # Linux/Mac
# or
.\venv\Scripts\activate # Windows
# Install dependencies
pip install -r requirements.txt
# Set up environment variables
cp .env.example .env
# Edit .env with your API keysCreate a cleaning_rules.yaml file for custom cleaning rules:
global_options:
enable_profiling: true
enable_missing_handling: true
enable_deduplication: true
enable_outlier_detection: true
enable_normalization: true
imputation:
enabled: true
numeric_strategy: median
categorical_strategy: mode
deduplication:
enabled: true
remove_duplicate_rows: true
keep: first
outlier_handling:
enabled: true
method: lof
action: remove
normalization:
enabled: true
standard_scaling:
enabled: true
columns: []
string_normalization:
enabled: true
lowercase: true
strip_whitespace: true# Simple cleaning
python cleaner.py --input data.csv --output cleaned.csv
# With profiling
python cleaner.py --input data.csv --output cleaned.csv --profile report.json
# With validation schema
python cleaner.py --input data.csv --output cleaned.csv --schema schema.json
# Using presets
python cleaner.py --input data.csv --output cleaned.csv --preset standard_clean
# Verbose mode
python cleaner.py --input data.csv --output cleaned.csv --verbosefrom cleaner import clean_file
from profiler import profile_dataframe
from validation import validate_dataframe
# Clean a file
result = clean_file(
input_path="data.csv",
output_path="cleaned.csv",
config_path="cleaning_rules.yaml",
preset="standard_clean",
profile_output="profile.json",
schema_path="schema.json"
)
# Profile data
from profiler import profile_dataframe, profile_to_dict
import pandas as pd
df = pd.read_csv("data.csv")
profile = profile_dataframe(df, "data.csv", ".csv")
profile_dict = profile_to_dict(profile)
# Validate data
from validation import load_schema, validate_dataframe
schema = load_schema("schema.json")
result = validate_dataframe(df, schema)# Start the server
python api.py
# Health check
curl http://localhost:8000/health
# Clean a file
curl -X POST -F "file=@data.csv" http://localhost:8000/clean
# Profile a file
curl -X POST -F "file=@data.csv" http://localhost:8000/profile
# Validate a file
curl -X POST -F "file=@data.csv" http://localhost:8000/validate
# Get metrics
curl http://localhost:8000/metricsfrom orchestrator import Orchestrator
from pathlib import Path
orchestrator = Orchestrator(output_dir=Path("output"))
orchestrator.initialize("document.txt")
success = orchestrator.run_pipeline()
if success:
state = orchestrator.get_state()
print(f"Processed {len(state.completed_agents)} agents")| Preset | Description |
|---|---|
quick_clean |
Basic cleaning with minimal processing |
standard_clean |
Balanced cleaning with outlier detection |
deep_clean |
Comprehensive cleaning with all features enabled |
| Format | Extension | Read | Write |
|---|---|---|---|
| CSV | .csv | ✅ | ✅ |
| JSON | .json, .jsn | ✅ | ✅ |
| Parquet | .parquet | ✅ | ✅ |
| Excel | .xlsx, .xls | ✅ | ✅ |
| XML | .xml | ✅ | ✅ |
| Avro | .avro | ✅ | ✅ |
| ORC | .orc | ✅ | ✅ |
- Missing Value Handling: Mean/median/mode imputation, constant values
- Deduplication: Remove duplicate rows and columns
- Outlier Detection: Isolation Forest, LOF, Z-score, IQR
- Normalization: Standard scaling, string normalization
- Type Inference: Automatic data type detection
| Endpoint | Method | Description |
|---|---|---|
/health |
GET | Health check |
/ |
GET | API information |
/clean |
POST | Clean a data file |
/profile |
POST | Profile a data file |
/validate |
POST | Validate against schema |
/metrics |
GET | Get dashboard metrics |
/presets |
GET | List available presets |
The pipeline produces:
- Cleaned data file in the same format as input
- Profile report (JSON) with column statistics
- Validation report with errors and warnings
- Metrics summary with cleaning statistics
- pandas>=2.2,<3
- numpy
- scikit-learn
- fastapi
- uvicorn
- pyarrow
- openai>=1.0,<2
- python-dotenv
- And more... (see requirements.txt)
MIT License