Skip to content

Latest commit

Β 

History

History
640 lines (479 loc) Β· 15.2 KB

File metadata and controls

640 lines (479 loc) Β· 15.2 KB

Universal MCP Server for Data & AI Engineering

Stop arguing about file formats. This MCP server ingests any data file and makes it queryable instantly.

Upload CSV, JSON, Excel, Parquet, Avro β€” the system auto-detects, routes to the right parser, and loads into a query engine.

No configuration. No format wars. No custom loaders.


🎯 What This Solves

Every data and AI team wastes time on:

  • ❌ Debating which format to use (CSV vs Parquet vs JSON)
  • ❌ Writing custom parsers for every data source
  • ❌ "We don't support that format" tickets blocking progress
  • ❌ Ad-hoc scripts breaking in production
  • ❌ Friction between business users and engineers

This MCP server eliminates all of that.


✨ Features

βœ… Auto-format detection - Upload any file, system figures out the format
βœ… Smart routing - Polars for speed, Pandas for compatibility
βœ… Instant SQL queries - DuckDB integration, query uploaded data immediately
βœ… Zero configuration - Works out of the box
βœ… REST API - Easy integration with any tool or LLM
βœ… Production-ready - Health checks, error handling, proper logging


πŸ“¦ Supported Formats

Format Extensions Status
CSV .csv, .tsv, .txt βœ… Full support
JSON .json, .jsonl βœ… Full support
Excel .xlsx, .xls βœ… Full support
Parquet .parquet βœ… Full support
Avro .avro βœ… Full support

πŸš€ Quick Start

Prerequisites

  • Docker installed (Get Docker)
  • Docker Compose installed (included with Docker Desktop)
  • That's it. No Python setup needed.

Start the Server (3 Commands)

# 1. Navigate to project directory
cd mcp-data-server

# 2. Start the server
docker-compose up --build

# 3. Wait for this message:
# "Application startup complete"

Server is now running at: http://localhost:8000


πŸ“˜ Basic Usage

1️⃣ Upload a File

Create a sample CSV:

cat > sample.csv << EOF
id,name,department,salary
1,Alice,Engineering,95000
2,Bob,Marketing,75000
3,Charlie,Engineering,105000
4,Diana,Sales,85000
EOF

Upload it:

curl -X POST "http://localhost:8000/upload" \
  -F "file=@sample.csv"

Response you'll get:

{
  "success": true,
  "filename": "sample.csv",
  "data": {
    "table_name": "sample",
    "rows": 4,
    "columns": 4,
    "column_names": ["id", "name", "department", "salary"],
    "format": "csv",
    "preview": [
      {"id": 1, "name": "Alice", "department": "Engineering", "salary": 95000},
      {"id": 2, "name": "Bob", "department": "Marketing", "salary": 75000}
    ]
  }
}

2️⃣ Query Your Data

Simple filter:

curl -X POST "http://localhost:8000/query" \
  -H "Content-Type: application/json" \
  -d '{"sql": "SELECT * FROM sample WHERE department = '\''Engineering'\''"}'

Aggregation:

curl -X POST "http://localhost:8000/query" \
  -H "Content-Type: application/json" \
  -d '{"sql": "SELECT department, AVG(salary) as avg_salary, COUNT(*) as count FROM sample GROUP BY department"}'

Response:

{
  "success": true,
  "rows": 3,
  "data": [
    {"department": "Engineering", "avg_salary": 100000, "count": 2},
    {"department": "Marketing", "avg_salary": 75000, "count": 1},
    {"department": "Sales", "avg_salary": 85000, "count": 1}
  ]
}

3️⃣ List All Tables

curl http://localhost:8000/tables

Returns all uploaded datasets currently loaded.

4️⃣ Check Server Health

curl http://localhost:8000/health

πŸ§ͺ Run Automated Tests

The project includes a test suite:

# Make test script executable
chmod +x test.sh

# Run all tests
./test.sh

What it tests:

  • βœ… Server health check
  • βœ… CSV upload and query
  • βœ… JSON upload and query
  • βœ… Filtering queries
  • βœ… Aggregation queries
  • βœ… Table listing

πŸŽ“ Real-World Examples

Example 1: Excel Report Analysis

# Upload your Excel file
curl -X POST "http://localhost:8000/upload" \
  -F "file=@quarterly_report.xlsx"

# Query it immediately (no conversion needed!)
curl -X POST "http://localhost:8000/query" \
  -H "Content-Type: application/json" \
  -d '{"sql": "SELECT quarter, SUM(revenue) as total_revenue FROM quarterly_report GROUP BY quarter"}'

Example 2: JSON API Data Processing

# Create JSON data (e.g., from an API response)
cat > products.json << EOF
[
  {"id": 1, "name": "Laptop", "price": 1200, "stock": 45},
  {"id": 2, "name": "Mouse", "price": 25, "stock": 150},
  {"id": 3, "name": "Keyboard", "price": 75, "stock": 89}
]
EOF

# Upload
curl -X POST "http://localhost:8000/upload" -F "file=@products.json"

# Find low-price items
curl -X POST "http://localhost:8000/query" \
  -H "Content-Type: application/json" \
  -d '{"sql": "SELECT name, price FROM products WHERE price < 100 ORDER BY price DESC"}'

Example 3: Multi-File Join Analysis

# Upload customers data
curl -X POST "http://localhost:8000/upload" -F "file=@customers.csv"

# Upload orders data
curl -X POST "http://localhost:8000/upload" -F "file=@orders.csv"

# Join across datasets
curl -X POST "http://localhost:8000/query" \
  -H "Content-Type: application/json" \
  -d '{"sql": "SELECT c.name, COUNT(o.id) as order_count, SUM(o.amount) as total_spent FROM customers c LEFT JOIN orders o ON c.id = o.customer_id GROUP BY c.name ORDER BY total_spent DESC"}'

Example 4: Parquet File Processing

# Upload Parquet file (common in data engineering)
curl -X POST "http://localhost:8000/upload" \
  -F "file=@large_dataset.parquet"

# Query instantly - no conversion needed
curl -X POST "http://localhost:8000/query" \
  -H "Content-Type: application/json" \
  -d '{"sql": "SELECT category, COUNT(*) FROM large_dataset GROUP BY category LIMIT 10"}'

πŸ€– LLM Integration Examples

Python Integration

import requests

def upload_file(filepath: str):
    with open(filepath, 'rb') as f:
        response = requests.post(
            'http://localhost:8000/upload',
            files={'file': f}
        )
    return response.json()

def query_data(sql: str):
    response = requests.post(
        'http://localhost:8000/query',
        json={'sql': sql}
    )
    return response.json()

# Use it
upload_file('sales_data.csv')
result = query_data("SELECT product, SUM(revenue) FROM sales_data GROUP BY product")
print(result)

LangChain Integration

from langchain.tools import Tool
import requests

def query_database(sql_query: str) -> dict:
    response = requests.post(
        "http://localhost:8000/query",
        json={"sql": sql_query}
    )
    return response.json()

# Create tool for your agent
data_query_tool = Tool(
    name="QueryData",
    func=query_database,
    description="Query uploaded datasets using SQL. Input should be a valid SQL query string."
)

# Add to your agent's tools
# Now your LLM can query any uploaded dataset!

OpenAI Function Calling

import openai
import requests

functions = [
    {
        "name": "query_database",
        "description": "Execute SQL query on uploaded data files",
        "parameters": {
            "type": "object",
            "properties": {
                "sql": {
                    "type": "string",
                    "description": "SQL query to execute on the data"
                }
            },
            "required": ["sql"]
        }
    }
]

def execute_query(sql: str):
    return requests.post(
        "http://localhost:8000/query",
        json={"sql": sql}
    ).json()

# Your AI can now generate and execute queries on any uploaded data

πŸ“Š Interactive API Documentation

Once the server is running, explore the API interactively:

Test endpoints directly in your browser!


πŸ”§ Configuration & Customization

Change the Port

Edit docker-compose.yml:

ports:
  - "8001:8000"  # Change 8001 to your desired port

Then restart:

docker-compose down
docker-compose up --build

Enable Persistent Storage

By default, data is stored in memory. To persist data across restarts:

Edit server.py, change line:

conn = duckdb.connect(':memory:')

To:

conn = duckdb.connect('/app/data/database.db')

Restart the server. Your data will now persist!

Add File Size Limits

Edit server.py, add to the upload_file function:

@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
    MAX_FILE_SIZE = 100 * 1024 * 1024  # 100MB
    
    content = await file.read()
    if len(content) > MAX_FILE_SIZE:
        raise HTTPException(status_code=413, detail="File too large")
    
    # ... rest of existing code

Add Authentication

For production use, add API key authentication:

from fastapi import Header, HTTPException

API_KEY = "your-secret-key"

async def verify_api_key(x_api_key: str = Header(...)):
    if x_api_key != API_KEY:
        raise HTTPException(status_code=403, detail="Invalid API key")

# Add to endpoints
@app.post("/upload", dependencies=[Depends(verify_api_key)])
async def upload_file(file: UploadFile = File(...)):
    # ... existing code

πŸ—οΈ How It Works

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Upload File    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Auto-Detect Format     β”‚ ← CSV, JSON, Excel, Parquet, Avro
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Route to Parser       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ Try: Polars (fast)      β”‚ ← 5-10x faster than Pandas
β”‚ Fallback: Pandas        β”‚ ← Handles edge cases
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Load into DuckDB       β”‚ ← In-memory SQL engine
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Query via REST API     β”‚ ← SQL queries, instant results
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Key Design Decisions:

  1. Polars First: 5-10x faster for most operations
  2. Pandas Fallback: Broader compatibility for edge cases
  3. DuckDB: Fast analytical queries without database setup
  4. In-Memory: Zero configuration, instant queries
  5. REST API: Easy integration with any language or tool

πŸ› Troubleshooting

Port Already in Use

Error: Bind for 0.0.0.0:8000 failed: port is already allocated

Solution:

# Check what's using port 8000
lsof -i :8000

# Kill the process or change port in docker-compose.yml

Permission Denied

Error: Permission denied when accessing uploads directory

Solution:

chmod -R 777 uploads/

Container Won't Start

Solution:

# Check logs for detailed error
docker-compose logs

# Rebuild from scratch
docker-compose down
docker-compose up --build --force-recreate

Out of Memory

Error: Container crashes with large files

Solutions:

  1. Increase Docker memory: Docker Desktop β†’ Settings β†’ Resources β†’ Memory
  2. Use persistent storage instead of in-memory (see Configuration section)
  3. Process large files in chunks

Tests Fail with "Connection Refused"

Solution: Server takes time to start. Wait longer:

sleep 10 && ./test.sh

Upload Fails with Specific Format

Solution: Check format is supported. For unsupported formats:

  1. Convert to CSV/JSON first
  2. Open an issue on GitHub for format support request

πŸ“ Project Structure

mcp-data-server/
β”œβ”€β”€ Dockerfile              # Container definition
β”œβ”€β”€ docker-compose.yml      # Service orchestration
β”œβ”€β”€ requirements.txt        # Python dependencies
β”œβ”€β”€ server.py              # Main application
β”œβ”€β”€ test.sh                # Automated tests
β”œβ”€β”€ .gitignore             # Git exclusions
β”œβ”€β”€ README.md              # This file
β”œβ”€β”€ uploads/               # (created on first run)
└── data/                  # (created on first run)

Key Files Explained

Dockerfile
Defines the Python 3.11 environment with all system dependencies.

docker-compose.yml
Single-service setup with health checks and volume mounts for data persistence.

requirements.txt
All Python dependencies with pinned versions:

  • FastAPI: REST API framework
  • Polars: Fast data processing
  • Pandas: Data compatibility layer
  • DuckDB: In-memory SQL engine
  • Format libraries: openpyxl, pyarrow, xlrd

server.py
Main application with:

  • Format auto-detection logic
  • Smart routing (Polars β†’ Pandas)
  • DuckDB integration
  • REST endpoints

test.sh
Automated test suite covering all features.


πŸš€ Production Deployment Checklist

Before deploying to production:

  • Authentication: Add API key or OAuth
  • Rate Limiting: Prevent abuse
  • Persistent Storage: Use DuckDB file storage
  • File Validation: Check file types and sizes
  • Monitoring: Add logging and metrics
  • CORS: Configure for web clients
  • SSL/TLS: Use HTTPS
  • Backups: Regular data backups
  • Load Balancing: For high traffic
  • Environment Variables: For secrets management

πŸ’‘ Common Use Cases

1. Self-Service Analytics for Business Users

  • Analysts upload their own data files
  • Query without waiting for engineering
  • No format conversion needed

2. LLM Data Pipeline

  • Upload datasets once
  • LLM generates SQL queries
  • Conversational data exploration

3. Data Integration Testing

  • Upload test data in any format
  • Validate transformations
  • Quick iteration

4. Rapid Prototyping

  • Experiment with different data sources
  • No database setup required
  • Instant feedback

5. Data Quality Checks

  • Upload production exports
  • Run validation queries
  • Identify issues quickly

🀝 Contributing

Found a bug? Want a feature?

  1. Check existing issues on GitHub
  2. Fork the repository
  3. Create a feature branch
  4. Make your changes
  5. Add tests if applicable
  6. Submit a pull request

πŸ“„ License

MIT License - Use it, modify it, ship it to production.

See LICENSE file for full details.


πŸ†˜ Support


🎯 What's Next?

This MCP server is a foundation. Extend it:

  • Add streaming data support
  • Connect to cloud storage (S3, GCS, Azure Blob)
  • Build a web UI for non-technical users
  • Add more data sources (APIs, databases)
  • Integrate with data warehouses
  • Add data transformation capabilities
  • Support for more file formats
  • Implement caching for repeated queries