Pipeline Python per l'estrazione, trasformazione e caricamento (ETL) di dati CSV relativi a consumazioni in mense universitarie. Il progetto unisce file CSV mensili, pulisce i dati, applica trasformazioni e li carica in un database SQL tramite SQLAlchemy.
- Caratteristiche
- Struttura del Progetto
- Schema Dati
- Installazione
- Utilizzo
- Test
- Scelte Implementative
- Extract: Lettura e unione di file CSV con separatore
;e codifica UTF-8 - Transform:
- Pulizia dati (gestione valori nulli, correzione valori fuori range)
- Normalizzazione stringhe
- Applicazione filtri (solo consumazioni attive)
- Aggiunta campi calcolati (mese di riferimento)
- Load: Caricamento dati in database SQL tramite SQLAlchemy
- Logging: Sistema di logging completo per tracciare tutte le operazioni
- Error Handling: Gestione robusta degli errori di decodifica e file incompleti
- Test Unitari: Suite di test per validare il funzionamento dei moduli
etl-with-python/
│
├── data/ # Directory con file CSV di input
│ ├── consumazioni_202501.csv
│ ├── consumazioni_202502.csv
│ └── consumazioni_202503.csv
│
├── src/ # Codice sorgente della pipeline
│ ├── __init__.py
│ ├── config.py # Configurazione generale
│ ├── logger.py # Setup del logging
│ ├── models.py # Modelli SQLAlchemy
│ ├── extractor.py # Estrazione e unione CSV
│ ├── cleaner.py # Pulizia e trasformazione
│ ├── loader.py # Caricamento nel database
│ └── pipeline.py # Orchestrazione pipeline completa
│
├── tests/ # Test unitari
│ ├── __init__.py
│ ├── test_extractor.py
│ ├── test_cleaner.py
│ ├── test_loader.py
│ └── run_tests.py
│
├── sql/ # Query SQL di esempio
│ └── queries_example.sql
│
├── main.py # Script CLI principale
├── requirements.txt # Dipendenze Python
└── README.md # Questo file
File CSV con 38 colonne originali separate da ;, codifica UTF-8. Campi principali:
DATA_REG_CASSA: timestamp registrazione cassaUTENTE_ID: ID numerico dell'utentePRESIDIO: sede della mensaRISTORANTE_PRESIDIO: presidio del ristoranteRISTORANTE: nome del ristoranteACQUISTO: tipo di pastoPREZZO_TOTALE: prezzo in euroCOSTO_TOTALE: costo in euroVALORE_PASTO: valore del pasto in euroSTATUS: stato consumazione (A=attivo, C=cancellato)- Altri campi amministrativi e anagrafici
Tabella consumazioni_cleaned:
| Colonna | Tipo | Descrizione |
|---|---|---|
id |
Integer | Chiave primaria auto-incrementale |
DATA_REG_CASSA |
DateTime | Data e ora della registrazione alla cassa |
UTENTE_ID |
Float | ID numerico dell'utente |
presidio |
String | Presidio/sede della mensa |
ristorante_presidio |
String | Presidio del ristorante |
ristorante |
String | Nome del ristorante |
prezzo_totale |
Float | Prezzo totale del pasto in euro |
costo_totale |
Float | Costo totale del pasto in euro |
valore_pasto |
Float | Valore del pasto in euro |
tipo_pasto |
String | Tipo di pasto acquistato |
mese_rif |
Integer | Mese di riferimento (1-12, calcolato) |
anno_rif |
Integer | Anno di riferimento (calcolato) |
Campi calcolati:
mese_rif: Estratto daDATA_REG_CASSA, utile per analisi temporali e report mensilianno_rif: Estratto daDATA_REG_CASSA, utile per analisi temporali e report annuali
- Python 3.8 o superiore
- pip (gestore pacchetti Python)
-
Clone o scarica il progetto
-
Crea un ambiente virtuale (consigliato)
python -m venv venv # Windows venv\Scripts\activate # Linux/Mac source venv/bin/activate
-
Installa le dipendenze
pip install -r requirements.txt
pandas>=2.0.0: manipolazione e analisi datisqlalchemy>=2.0.0: ORM per database SQLpytest>=7.0.0: framework per test unitari
python main.pyQuesto comando:
- Legge tutti i file CSV nella directory
data/con patternconsumazioni_*.csv - Unisce i dati in un unico dataset
- Pulisce e trasforma i dati
- Carica i dati nel database SQLite
consumazioni.db - Mostra statistiche sui dati processati
# Specifica directory dati diversa
python main.py --data-dir ./my_data
# Usa pattern diverso per i CSV
python main.py --pattern "*.csv"
# Esporta anche in CSV i dati puliti
python main.py --export output_cleaned.csv
# Abilita logging dettagliato
python main.py --verbose
# Mostra aiuto
python main.py --helpPer default viene usato SQLite locale. Per usare un database diverso, imposta la variabile d'ambiente DATABASE_URL:
# PostgreSQL
export DATABASE_URL="postgresql://user:password@localhost/dbname"
# MySQL
export DATABASE_URL="mysql+pymysql://user:password@localhost/dbname"
python main.pyNel file sql/queries_example.sql sono disponibili query SQL di esempio per analizzare i dati:
- Conteggio consumazioni per presidio
- Analisi temporale per mese
- Top tipi di pasto più venduti
- Analisi utenti frequenti
- Distribuzione prezzi
Esempio:
# SQLite
sqlite3 consumazioni.db < sql/queries_example.sql# Esegui tutti i test con pytest
python -m pytest
# Esegui con output dettagliato
python -m pytest -v
# Esegui test di un singolo modulo
python -m pytest tests/test_extractor.py
python -m pytest tests/test_cleaner.py
python -m pytest tests/test_loader.py
# Esegui un test specifico
python -m pytest tests/test_cleaner.py::test_convert_datetimeI test coprono:
- Extractor: lettura CSV, gestione errori, unione file
- Cleaner: conversione tipi, gestione nulli, normalizzazione, filtri, campi calcolati
- Loader: connessione database, caricamento dati, statistiche
Il progetto è organizzato in moduli separati per:
- Separazione delle responsabilità: ogni modulo ha un compito specifico
- Testabilità: facilita la scrittura di test unitari
- Manutenibilità: modifiche isolate senza impatti globali
- Riusabilità: i moduli possono essere usati indipendentemente
-
Decodifica file:
- Tentativo primario con UTF-8
- Fallback su latin-1 se UTF-8 fallisce
- Log dettagliati degli errori
-
Colonne mancanti:
- Warning per colonne mancanti o aggiuntive
- Continua elaborazione con colonne disponibili
-
Valori nulli:
- Campi prezzo (
PREZZO_TOTALE,COSTO_TOTALE,VALORE_PASTO) nulli → 0.0 (pasti gratuiti o valori mancanti) UTENTE_IDnulli → 0.0- Rimozione righe con campi critici nulli (data, presidio, utente_id)
- Campi prezzo (
-
Valori fuori range:
- Prezzi < 0 → 0.0
- Prezzi > 50 → 50.0 (valore massimo configurabile)
-
Filtro STATUS='A':
- Considera solo consumazioni attive
- Motivazione: escludere transazioni annullate o non valide
-
Normalizzazione stringhe:
- Rimozione spazi in eccesso
- Conversione a maiuscolo
- Motivazione: uniformità per analisi e aggregazioni
-
Campi calcolati:
mese_rif: Estratto daDATA_REG_CASSA(facilita query e report per periodo mensile)anno_rif: Estratto daDATA_REG_CASSA(facilita query e report per periodo annuale)
- Chunk loading: caricamento database in batch di 1000 righe
- Low memory mode: lettura CSV ottimizzata per file grandi
- Index database: indici su
DATA_REG_CASSA,presidio,mese_rif,anno_rif
Sistema di logging su 3 livelli che scrive sia su console che su file:
- INFO: operazioni principali e statistiche
- WARNING: problemi non critici (colonne mancanti, valori non convertibili)
- ERROR: errori bloccanti
I log vengono salvati nella directory logs/ con nome file etl_pipeline_YYYYMMDD.log (un file per giorno). Questo permette di:
- Tracciabilità: storico completo delle esecuzioni
- Debug: analisi post-esecuzione di errori
- Audit: verifica delle operazioni effettuate
- Monitoraggio: identificazione di pattern di errori
Possibili miglioramenti:
- Supporto per formati aggiuntivi (Excel, JSON)
- Dashboard di visualizzazione dati
- Scheduler per esecuzione periodica
- API REST per interrogazione dati
- Report automatici via email
Questo progetto è fornito come esempio didattico.
Progetto ETL Pipeline - 2025
Extract, transform, load with Python