Skip to content

massaindustries/data-refinery

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

18 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Data Refinery

A production-ready, multi-agent AI-powered data cleaning pipeline with schema validation, profiling, and intelligent agent workflows.

Features

  • Multi-Format Support: CSV, JSON, Parquet, Excel, XML, Avro, ORC
  • AI-Powered Cleaning: LLM-based structuring, normalization, and layout agents
  • Schema Validation: JSON schema-based validation with regex patterns and dependencies
  • Data Profiling: Comprehensive column profiling with statistics and distributions
  • Multiple Cleaning Presets: quick_clean, standard_clean, deep_clean
  • REST API: FastAPI service for programmatic access
  • CLI Interface: Command-line tools for batch processing
  • Pipeline Orchestration: Checkpoint-based pipeline with retry logic

Architecture

data-refinery/
├── agents/              # AI agents for intelligent processing
│   ├── structuring_agent.py
│   ├── normalization_agent.py
│   ├── layout_agent.py
│   └── human_agent.py
├── cleaner.py           # Main cleaning pipeline
├── profiler.py          # Data profiling utilities
├── validation.py        # Schema validation
├── orchestrator.py      # Multi-agent pipeline orchestration
├── api.py              # FastAPI REST service
├── csv_cleaner.py      # CSV-specific cleaning
├── config.py           # Configuration settings
├── cleaning_config.py  # Configurable cleaning rules
└── examples/           # Usage examples

Installation

# Clone the repository
cd projects/data-refinery

# Create virtual environment
python -m venv venv
source venv/bin/activate  # Linux/Mac
# or
.\venv\Scripts\activate   # Windows

# Install dependencies
pip install -r requirements.txt

# Set up environment variables
cp .env.example .env
# Edit .env with your API keys

Configuration

Create a cleaning_rules.yaml file for custom cleaning rules:

global_options:
  enable_profiling: true
  enable_missing_handling: true
  enable_deduplication: true
  enable_outlier_detection: true
  enable_normalization: true

imputation:
  enabled: true
  numeric_strategy: median
  categorical_strategy: mode

deduplication:
  enabled: true
  remove_duplicate_rows: true
  keep: first

outlier_handling:
  enabled: true
  method: lof
  action: remove

normalization:
  enabled: true
  standard_scaling:
    enabled: true
    columns: []
  string_normalization:
    enabled: true
    lowercase: true
    strip_whitespace: true

Usage

CLI

# Simple cleaning
python cleaner.py --input data.csv --output cleaned.csv

# With profiling
python cleaner.py --input data.csv --output cleaned.csv --profile report.json

# With validation schema
python cleaner.py --input data.csv --output cleaned.csv --schema schema.json

# Using presets
python cleaner.py --input data.csv --output cleaned.csv --preset standard_clean

# Verbose mode
python cleaner.py --input data.csv --output cleaned.csv --verbose

Python API

from cleaner import clean_file
from profiler import profile_dataframe
from validation import validate_dataframe

# Clean a file
result = clean_file(
    input_path="data.csv",
    output_path="cleaned.csv",
    config_path="cleaning_rules.yaml",
    preset="standard_clean",
    profile_output="profile.json",
    schema_path="schema.json"
)

# Profile data
from profiler import profile_dataframe, profile_to_dict
import pandas as pd

df = pd.read_csv("data.csv")
profile = profile_dataframe(df, "data.csv", ".csv")
profile_dict = profile_to_dict(profile)

# Validate data
from validation import load_schema, validate_dataframe
schema = load_schema("schema.json")
result = validate_dataframe(df, schema)

REST API

# Start the server
python api.py

# Health check
curl http://localhost:8000/health

# Clean a file
curl -X POST -F "file=@data.csv" http://localhost:8000/clean

# Profile a file
curl -X POST -F "file=@data.csv" http://localhost:8000/profile

# Validate a file
curl -X POST -F "file=@data.csv" http://localhost:8000/validate

# Get metrics
curl http://localhost:8000/metrics

Multi-Agent Pipeline

from orchestrator import Orchestrator
from pathlib import Path

orchestrator = Orchestrator(output_dir=Path("output"))
orchestrator.initialize("document.txt")
success = orchestrator.run_pipeline()

if success:
    state = orchestrator.get_state()
    print(f"Processed {len(state.completed_agents)} agents")

Presets

Preset Description
quick_clean Basic cleaning with minimal processing
standard_clean Balanced cleaning with outlier detection
deep_clean Comprehensive cleaning with all features enabled

File Formats

Format Extension Read Write
CSV .csv
JSON .json, .jsn
Parquet .parquet
Excel .xlsx, .xls
XML .xml
Avro .avro
ORC .orc

Cleaning Operations

  1. Missing Value Handling: Mean/median/mode imputation, constant values
  2. Deduplication: Remove duplicate rows and columns
  3. Outlier Detection: Isolation Forest, LOF, Z-score, IQR
  4. Normalization: Standard scaling, string normalization
  5. Type Inference: Automatic data type detection

API Endpoints

Endpoint Method Description
/health GET Health check
/ GET API information
/clean POST Clean a data file
/profile POST Profile a data file
/validate POST Validate against schema
/metrics GET Get dashboard metrics
/presets GET List available presets

Output

The pipeline produces:

  • Cleaned data file in the same format as input
  • Profile report (JSON) with column statistics
  • Validation report with errors and warnings
  • Metrics summary with cleaning statistics

Dependencies

  • pandas>=2.2,<3
  • numpy
  • scikit-learn
  • fastapi
  • uvicorn
  • pyarrow
  • openai>=1.0,<2
  • python-dotenv
  • And more... (see requirements.txt)

License

MIT License

About

From PDF to structured data with LLMs

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages