diff --git a/dags/data_pipeline.py b/dags/data_pipeline.py new file mode 100644 index 0000000..bbbd932 --- /dev/null +++ b/dags/data_pipeline.py @@ -0,0 +1,423 @@ +""" +Airflow DAG for Tasks 1-3: +- Task 1: build detailed_orders daily (order-grain denormalized table with JSONB line items) +- Task 2: build order_tracking daily (status-level aggregate snapshot) +- Task 3: build predictions daily (simple weighted-average forecasting outputs) + +""" + +from __future__ import annotations + +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.postgres.hooks.postgres import PostgresHook +from airflow.providers.postgres.operators.postgres import PostgresOperator + + +POSTGRES_CONN_ID = "postgres_data" + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "email_on_failure": False, + "email_on_retry": False, + "retries": 2, + "retry_delay": timedelta(minutes=2), + "start_date": datetime(2024, 1, 1), +} + + +def validate_counts_equal(source_sql: str, target_sql: str, label: str) -> None: + """ + Generic validation helper: compares two COUNT(*) queries and fails the task if they differ. + """ + hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID) + source_count = hook.get_first(source_sql)[0] + target_count = hook.get_first(target_sql)[0] + + print(f"[VALIDATION] {label}: source_count={source_count}, target_count={target_count}") + + if source_count != target_count: + raise ValueError( + f"Validation failed for {label}: source_count ({source_count}) != target_count ({target_count})" + ) + + +def validate_order_tracking() -> None: + """ + Validations for order_tracking: + - Sum(order_count) equals COUNT(*) from orders + - Table contains exactly 4 statuses (pending, processing, shipped, completed) + - No unknown statuses exist in source `orders` + """ + hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID) + + source_count = hook.get_first("SELECT COUNT(*) FROM orders;")[0] + tracked_count = hook.get_first("SELECT COALESCE(SUM(order_count), 0) FROM order_tracking;")[0] + status_count = hook.get_first("SELECT COUNT(*) FROM order_tracking;")[0] + unknown_statuses = hook.get_records( + """ + SELECT status + FROM orders + WHERE status NOT IN ('pending','processing','shipped','completed') + GROUP BY status; + """ + ) + + print( + f"[VALIDATION] order_tracking: orders_count={source_count}, " + f"tracked_sum={tracked_count}, status_rows={status_count}" + ) + + if source_count != tracked_count: + raise ValueError( + f"Validation failed: orders count ({source_count}) != SUM(order_tracking.order_count) ({tracked_count})" + ) + + if status_count != 4: + raise ValueError(f"Validation failed: expected 4 statuses, found {status_count}") + + if unknown_statuses: + unknown_values = ", ".join([row[0] for row in unknown_statuses]) + raise ValueError( + "Validation failed: found unknown source order statuses: " + f"{unknown_values}" + ) + + +def validate_predictions() -> None: + """ + Validations for predictions: + - Contains 1 monthly prediction + 1 per product category + - monthly_orders_prediction exists exactly once + - No negative amounts + """ + hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID) + + expected_count = hook.get_first("SELECT COUNT(DISTINCT category) + 1 FROM products;")[0] + prediction_count = hook.get_first("SELECT COUNT(*) FROM predictions;")[0] + monthly_exists = hook.get_first( + "SELECT COUNT(*) FROM predictions WHERE name = 'monthly_orders_prediction';" + )[0] + negative_count = hook.get_first("SELECT COUNT(*) FROM predictions WHERE amount < 0;")[0] + + print( + "[VALIDATION] predictions: " + f"expected_rows={expected_count}, actual_rows={prediction_count}, " + f"monthly_exists={monthly_exists}, negative_rows={negative_count}" + ) + + if prediction_count != expected_count: + raise ValueError( + f"Validation failed: expected {expected_count} prediction rows, found {prediction_count}" + ) + + if monthly_exists != 1: + raise ValueError("Validation failed: monthly_orders_prediction is missing or duplicated") + + if negative_count > 0: + raise ValueError(f"Validation failed: found {negative_count} negative prediction amounts") + + +with DAG( + dag_id="ecommerce_data_pipeline", + default_args=default_args, + description="Create and refresh detailed_orders, order_tracking, and predictions", + schedule="@daily", + catchup=False, + max_active_runs=1, + tags=["technical-test", "postgres"], +) as dag: + dag.doc_md = """ + + This DAG performs three daily tasks: + + 1. Builds `detailed_orders` (denormalized order-level table). + - Grain: 1 row per order + - `items` is a JSONB array of order line items + - `order_date_epoch` stored as unix epoch seconds + + 2. Builds `order_tracking` (status-level aggregate snapshot). + - Grain: 1 row per status + + 3. Builds `predictions` (simple forecasting outputs). + - Monthly orders: weighted average of last two observed months (70/30) + - Category units: same weighted average per category + + All three tables are built using an idempotent full-refresh pattern (TRUNCATE + INSERT). + Each stage includes validation tasks which will fail the DAG if inconsistencies are detected. + """ + + # --------------- Connection check like the sample DAG --------------- + check_postgres_connection = PostgresOperator( + task_id="check_postgres_connection", + postgres_conn_id=POSTGRES_CONN_ID, + sql="SELECT 1;", + doc_md="Verifies the configured Airflow connection `postgres_data` can query PostgreSQL.", + ) + + # ---------------------- Task 1: detailed_orders ---------------------- + create_detailed_orders_table = PostgresOperator( + task_id="create_detailed_orders_table", + postgres_conn_id=POSTGRES_CONN_ID, + sql=""" + CREATE TABLE IF NOT EXISTS detailed_orders ( + order_id BIGINT PRIMARY KEY, + customer_id BIGINT NOT NULL, + order_date_epoch BIGINT NOT NULL, + total_amount NUMERIC(10, 2) NOT NULL, + status VARCHAR(20) NOT NULL, + items JSONB NOT NULL + ); + """, + doc_md=""" + Creates the `detailed_orders` table if it does not exist. + + **Grain:** 1 row per order (`order_id` primary key). + **items:** JSONB array of line items containing product_id, product_name, quantity, price. + """, + ) + + refresh_detailed_orders = PostgresOperator( + task_id="refresh_detailed_orders", + postgres_conn_id=POSTGRES_CONN_ID, + sql=""" + BEGIN; + TRUNCATE TABLE detailed_orders; + + INSERT INTO detailed_orders ( + order_id, + customer_id, + order_date_epoch, + total_amount, + status, + items + ) + SELECT + o.id AS order_id, + o.customer_id, + EXTRACT(EPOCH FROM o.order_date)::BIGINT AS order_date_epoch, + o.total_amount, + o.status, + COALESCE( + jsonb_agg( + jsonb_build_object( + 'product_id', oi.product_id, + 'product_name', p.name, + 'quantity', oi.quantity, + 'price', oi.price + ) + ORDER BY oi.id + ) FILTER (WHERE oi.id IS NOT NULL), + '[]'::jsonb + ) AS items + FROM orders o + LEFT JOIN order_items oi ON oi.order_id = o.id + LEFT JOIN products p ON p.id = oi.product_id + GROUP BY o.id, o.customer_id, o.order_date, o.total_amount, o.status; + + COMMIT; + """, + doc_md=""" + Rebuilds `detailed_orders` daily using a full refresh. + + - Idempotent: TRUNCATE + INSERT + - Ensures one row per order, with JSONB aggregated items + - Stores order date as unix epoch seconds + """, + ) + + validate_detailed_orders_row_count = PythonOperator( + task_id="validate_detailed_orders_row_count", + python_callable=validate_counts_equal, + op_kwargs={ + "source_sql": "SELECT COUNT(*) FROM orders;", + "target_sql": "SELECT COUNT(*) FROM detailed_orders;", + "label": "orders_vs_detailed_orders_row_count", + }, + doc_md="Validates that `detailed_orders` has the same number of rows as `orders` (1 row per order).", + ) + + # ---------------------- Task 2: order_tracking ---------------------- + create_order_tracking_table = PostgresOperator( + task_id="create_order_tracking_table", + postgres_conn_id=POSTGRES_CONN_ID, + sql=""" + CREATE TABLE IF NOT EXISTS order_tracking ( + status VARCHAR(20) PRIMARY KEY, + order_count BIGINT NOT NULL, + total_value NUMERIC(12, 2) NOT NULL, + last_updated TIMESTAMPTZ NOT NULL + ); + """, + doc_md=""" + Creates the `order_tracking` table if it does not exist. + + **Grain:** 1 row per status. + Stores counts and total order value per status, plus load timestamp (`last_updated`). + """, + ) + + refresh_order_tracking = PostgresOperator( + task_id="refresh_order_tracking", + postgres_conn_id=POSTGRES_CONN_ID, + sql=""" + BEGIN; + TRUNCATE TABLE order_tracking; + + INSERT INTO order_tracking (status, order_count, total_value, last_updated) + WITH statuses AS ( + SELECT unnest(ARRAY['pending', 'processing', 'shipped', 'completed']) AS status + ) + SELECT + s.status, + COUNT(o.id)::BIGINT AS order_count, + COALESCE(SUM(o.total_amount), 0)::NUMERIC(12, 2) AS total_value, + NOW() AS last_updated + FROM statuses s + LEFT JOIN orders o ON o.status = s.status + GROUP BY s.status; + + COMMIT; + """, + doc_md=""" + Rebuilds `order_tracking` daily using a full refresh. + + - Includes all 4 statuses even if count is zero + - Idempotent: TRUNCATE + INSERT + - `last_updated` uses NOW() at load time + """, + ) + + validate_order_tracking_totals = PythonOperator( + task_id="validate_order_tracking_totals", + python_callable=validate_order_tracking, + doc_md=""" + Validates `order_tracking`: + - SUM(order_count) equals COUNT(*) from orders + - Table contains exactly 4 status rows + """, + ) + + # ---------------------- Task 3: predictions ---------------------- + create_predictions_table = PostgresOperator( + task_id="create_predictions_table", + postgres_conn_id=POSTGRES_CONN_ID, + sql=""" + CREATE TABLE IF NOT EXISTS predictions ( + name VARCHAR(100) PRIMARY KEY, + amount NUMERIC(12, 2) NOT NULL + ); + """, + doc_md=""" + Creates the `predictions` table if it does not exist. + + **Grain:** 1 row per prediction name. + """, + ) + + refresh_predictions = PostgresOperator( + task_id="refresh_predictions", + postgres_conn_id=POSTGRES_CONN_ID, + sql=""" + BEGIN; + TRUNCATE TABLE predictions; + + INSERT INTO predictions (name, amount) + WITH monthly_orders AS ( + SELECT date_trunc('month', order_date)::date AS month_start, + COUNT(*)::NUMERIC AS metric + FROM orders + GROUP BY 1 + ), + ranked_orders AS ( + SELECT month_start, metric, + ROW_NUMBER() OVER (ORDER BY month_start DESC) AS rn + FROM monthly_orders + ), + orders_prediction AS ( + SELECT + 'monthly_orders_prediction'::VARCHAR AS name, + ROUND( + GREATEST( + 0, + COALESCE(MAX(CASE WHEN rn = 1 THEN metric END), 0) * 0.7 + + COALESCE( + MAX(CASE WHEN rn = 2 THEN metric END), + COALESCE(MAX(CASE WHEN rn = 1 THEN metric END), 0) + ) * 0.3 + ), + 0 + )::NUMERIC(12, 2) AS amount + FROM ranked_orders + ), + monthly_category_units AS ( + SELECT date_trunc('month', o.order_date)::date AS month_start, + p.category, + SUM(oi.quantity)::NUMERIC AS metric + FROM orders o + JOIN order_items oi ON oi.order_id = o.id + JOIN products p ON p.id = oi.product_id + GROUP BY 1, 2 + ), + ranked_category_units AS ( + SELECT month_start, category, metric, + ROW_NUMBER() OVER (PARTITION BY category ORDER BY month_start DESC) AS rn + FROM monthly_category_units + ), + categories AS ( + SELECT DISTINCT category FROM products + ), + category_predictions AS ( + SELECT + 'category_' || c.category || '_prediction' AS name, + ROUND( + GREATEST( + 0, + COALESCE(MAX(CASE WHEN rcu.rn = 1 THEN rcu.metric END), 0) * 0.7 + + COALESCE( + MAX(CASE WHEN rcu.rn = 2 THEN rcu.metric END), + COALESCE(MAX(CASE WHEN rcu.rn = 1 THEN rcu.metric END), 0) + ) * 0.3 + ), + 0 + )::NUMERIC(12, 2) AS amount + FROM categories c + LEFT JOIN ranked_category_units rcu ON rcu.category = c.category + GROUP BY c.category + ) + SELECT name, amount FROM orders_prediction + UNION ALL + SELECT name, amount FROM category_predictions; + + COMMIT; + """, + doc_md=""" + Rebuilds `predictions` daily using a full refresh. + + Method: + - Monthly orders prediction: weighted average of the last two observed months (70% most recent, 30% previous) + - Category unit predictions: same method applied per category + Values are clipped at 0 and rounded to whole numbers. + """, + ) + + validate_predictions_task = PythonOperator( + task_id="validate_predictions", + python_callable=validate_predictions, + doc_md="Validates row counts, presence of monthly prediction, and non-negative predicted amounts.", + ) + + # ---------------------- Dependencies ---------------------- + # Keep the three branches independent after connectivity check (fast, clean). + check_postgres_connection >> [ + create_detailed_orders_table, + create_order_tracking_table, + create_predictions_table, + ] + + create_detailed_orders_table >> refresh_detailed_orders >> validate_detailed_orders_row_count + create_order_tracking_table >> refresh_order_tracking >> validate_order_tracking_totals + create_predictions_table >> refresh_predictions >> validate_predictions_task diff --git a/docker-compose.yml b/docker-compose.yml index 7615060..0e33468 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,7 @@ x-airflow-common: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres-airflow:5432/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres-airflow:5432/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 - AIRFLOW__CORE__FERNET_KEY: 'Zp3fB8vK9mN2qR5tY8wC1eH4jL7nP0sU6xA3dF6gI9=' + AIRFLOW__CORE__FERNET_KEY: '5cjz_vWeQ6EXGbV_TmQV0BwFU2pmfQp0AndcoBoFhe0=' AIRFLOW__WEBSERVER__SECRET_KEY: 'a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6q7r8s9t0u1v2w3x4y5z6' AIRFLOW__WEBSERVER__SESSION_LIFETIME_MINUTES: '43200' AIRFLOW__WEBSERVER__COOKIE_SECURE: 'False' @@ -173,7 +173,7 @@ services: mkdir -p /sources/logs /sources/dags /sources/plugins chown -R "${AIRFLOW_UID:-50000}:0" /sources/logs /sources/plugins chmod -R 755 /sources/dags - exec /entrypoint airflow version + /entrypoint airflow version airflow db init airflow users create \ --username admin \ @@ -181,7 +181,7 @@ services: --lastname User \ --role Admin \ --email admin@example.com \ - --password admin + --password admin || echo "User already exists" # Add PostgreSQL connection airflow connections add 'postgres_data' \ --conn-type 'postgres' \ @@ -203,6 +203,7 @@ services: # Superset superset: image: apache/superset:3.0.0 + platform: linux/amd64 container_name: superset environment: - SUPERSET_SECRET_KEY=thisISaSECRET_1234567890abcdefghijklmnopqrstuvwxyz diff --git a/screenshots/dashboard.png b/screenshots/dashboard.png new file mode 100644 index 0000000..7777517 Binary files /dev/null and b/screenshots/dashboard.png differ