diff --git a/DHIS2/file_garbage_remover/Readme.md b/DHIS2/file_garbage_remover/Readme.md new file mode 100644 index 00000000..e1394aa2 --- /dev/null +++ b/DHIS2/file_garbage_remover/Readme.md @@ -0,0 +1,177 @@ +# DHIS2 File Garbage Remover Scripts + +This folder contains two Python scripts designed to manage orphaned resources in DHIS2 instances. +These scripts identify files with no database references and take appropriate actions depending on the environment, production (tomcat) or testing (docker). + +Included Scripts + +## main.py (recommended entry point) + +Run cleanup (tomcat or docker) and optionally send a notification in one command. Dry-run is the default unless you add `--force`. + +### Example (docker cleanup + notify using webhook from config.json) +``` +python3 main.py \ + --mode docker \ + --docker-instance docker.eyeseetea.com/widpit/dhis2-data:2.41-widp-dev-test \ + --csv-path /tmp/fg_docker.csv \ + --config /path/to/config.json \ + --notify-title "Orphans widp-dev-test" +``` +- Add `--force` to delete/move for real (tomcat) or delete in container (docker). +- Add `--notify-test` to print the payload instead of sending. +- To notify only (no cleanup): `python3 main.py --notify-only --csv-path /tmp/fg.csv --config /path/to/config.json --notify-title "..." [--notify-test]` +- To skip sending and save in CSV as notified: `--save-all-as-notified`. + +### Example (tomcat cleanup, dry-run, save as nnotified) +``` +DB_PASSWORD_FG=your_password python3 main.py \ + --config /path/to/config.json \ + --csv-path /tmp/fg_tomcat.csv \ + --save-all-as-notified +``` +Add `--force` to move/delete files and files in DB. + +config.json needs the cleanup settings plus the webhook (if you want notifications): +``` +{ + "db_host": "", + "db_port": "", + "db_name": "", + "db_user": "", + "file_base_path": "", + "temp_file_path": "", + "webhook-url": "https://your.webhook.url", + "notify-http-proxy": "http://openproxy.who.int:8080", + "notify-https-proxy": "http://openproxy.who.int:8080" +} +``` + +## file_garbage_remover_tomcat.py + +### Description: + +Designed for production environments. Identifies orphaned file resources (documents or datavalues (files or images attached to a value)), moves the files to a temporary directory, and archives corresponding database entries into a special table(fileresourcesaudit) before deleting them from the original database table. + +### Usage: + +Run the script in either test or force mode. + +Test Mode (dry-run): + +``` +export DB_PASSWORD_FG='your_password' +./file_garbage_remover_tomcat.py --test --config /path/to/config.json +``` + +Force Mode (apply changes): + +``` +export DB_PASSWORD_FG='your_password' +./file_garbage_remover_tomcat.py --force --config /path/to/config.json +``` + +config.json File Requirements: +``` +{ + "db_host": "", + "db_port": "", + "db_name": "", + "db_user": "", + "file_base_path": "", + "temp_file_path": "" +} +``` + +Ensure file_base_path and temp_file_path exist and are valid directories. + +Password should be provided through the environment variable DB_PASSWORD_FG. + +Bash wrapper example: +``` +#!/bin/bash +MODE="${1:-test}" +[[ "$MODE" != "test" && "$MODE" != "force" ]] && echo "Invalid mode." && exit 1 + +DB_PASSWORD_FG=db_password + +python3 /path/to/script/bin/file_garbage_remover/file_garbage_remover_tomcat.py --config /path/to/script/bin/file_garbage_remover/config.json --$MODE 2>&1 | tee -a /path/to/logs/orphan_cleanup.log + +unset DB_PASSWORD_FG +``` + +## file_garbage_remover_docker.py + +### Description: + +Intended for d2-docker testing environments. +Identifies orphaned files in a Dockerized DHIS2 instance and directly deletes them from the container. This script does not archive or move files, permanently removing identified resources. + +### Usage: + +Run the script specifying the Docker DHIS2 instance: + +``` +./file_garbage_remover_docker.py --instance docker.eyeseetea.com/project/dhis2-data:2.41-test +``` + +### Operation: + +Executes an SQL query within the DHIS2 container using d2-docker run-sql. + +Copies the generated file list into the container. + +Deletes identified files directly inside the container. + +# Precautions + +Production Environments: Always use --test mode before using --force to verify intended changes. + +Docker/Test Environments: Files deleted by file_garbage_remover_docker.py cannot be recovered. + +## Restoring fileresource entries (tomcat) + +Manual steps if a resource was moved by mistake: +1) Restore the row into `fileresource` from `fileresourcesaudit` using a **new** `fileresourceid` to avoid collisions. +Column list (stable): `uid, code, created, lastupdated, name, contenttype, contentlength, contentmd5, storagekey, isassigned, domain, userid, lastupdatedby, hasmultiplestoragefiles, fileresourceowner` +``` +INSERT INTO fileresource ( + fileresourceid, uid, code, created, lastupdated, name, contenttype, + contentlength, contentmd5, storagekey, isassigned, domain, userid, + lastupdatedby, hasmultiplestoragefiles, fileresourceowner +) +SELECT + nextval('fileresource_fileresourceid_seq'), uid, code, created, lastupdated, + name, contenttype, contentlength, contentmd5, storagekey, isassigned, + domain, userid, lastupdatedby, hasmultiplestoragefiles, fileresourceowner +FROM fileresourcesaudit +WHERE fileresourceid = ; +``` +Run inside a transaction and verify before commit. + +2) Move the file back from `temp_file_path` to `file_base_path`, keeping the folder (`document` or `dataValue`). Use a wildcard to catch image variants (e.g., `..._max`, `..._min`): +``` +mv "//"* "//" +``` + +Example to restore entries moved in the last 24 hours: +``` +-- 1) Reinsert rows with new IDs +INSERT INTO fileresource ( + fileresourceid, uid, code, created, lastupdated, name, contenttype, + contentlength, contentmd5, storagekey, isassigned, domain, userid, + lastupdatedby, hasmultiplestoragefiles, fileresourceowner +) +SELECT + nextval('fileresource_fileresourceid_seq'), uid, code, created, lastupdated, + name, contenttype, contentlength, contentmd5, storagekey, isassigned, + domain, userid, lastupdatedby, hasmultiplestoragefiles, fileresourceowner +FROM fileresourcesaudit +WHERE COALESCE(lastupdated, created, NOW()) >= (NOW() - INTERVAL '24 hours'); + +-- 2) Move files back (adjust paths) +find "" -type f -mtime -1 -print0 | while IFS= read -r -d '' f; do + rel="${f#/}" + mv "$f" "/$rel" +done +``` diff --git a/DHIS2/file_garbage_remover/cleaner.py b/DHIS2/file_garbage_remover/cleaner.py new file mode 100644 index 00000000..47b03d42 --- /dev/null +++ b/DHIS2/file_garbage_remover/cleaner.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +import json +import os +import re +import shutil +import sys +from datetime import datetime + +import subprocess +import psycopg2 + +from common import load_config, log, mark_items_notified +from csv_utils import append_items, deduplicate_items, merge_notified_flags +from sql_queries import ( + SQL_CREATE_TABLE_IF_NOT_EXIST, + SQL_DATA_VALUE_UIDS, + SQL_DELETE_ORIGINAL, + SQL_EVENT_FILE_UIDS, + SQL_FIND_DATA_VALUES_FILE_RESOURCES, + SQL_FIND_ORPHANS_DOCUMENTS, + SQL_INSERT_AUDIT, + SQL_TRACKER_ATTRIBUTE_UIDS, +) + + +def get_events(cursor): + cursor.execute(SQL_EVENT_FILE_UIDS) + return [row[0] for row in cursor.fetchall() if row[0]] + + +def get_event_blob(cursor): + event_texts = get_events(cursor) + return "\n".join(json.dumps(e) for e in event_texts) + + +def get_all_datavalue_uids(cursor): + cursor.execute(SQL_DATA_VALUE_UIDS) + return set(row[0] for row in cursor.fetchall() if row[0]) + + +def get_all_tracker_uids(cursor): + cursor.execute(SQL_TRACKER_ATTRIBUTE_UIDS) + return set(row[0] for row in cursor.fetchall() if row[0]) + + +def get_matching_files(storage_key, base_dir, folder): + storage_key = storage_key.replace(folder + "/", "") + base_dir = os.path.join(base_dir, folder) + if not os.path.isdir(base_dir): + log(f"⚠️ Skipping search: base directory does not exist: {base_dir}") + return [] + matching = [] + for filename in os.listdir(base_dir): + if filename.startswith(storage_key): + fullpath = os.path.join(base_dir, filename) + if os.path.isfile(fullpath): + matching.append(fullpath) + return matching + + +def update_db(fileresourceid, cursor, conn, dry_run=False): + if dry_run: + log("[DRY RUN] Would execute:") + log(f" {SQL_INSERT_AUDIT.strip()}") + log(f" with parameters: {{'fid': '{fileresourceid}'}}") + log(f" {SQL_DELETE_ORIGINAL.strip()} ") + log(f" with parameters: {{'fid': '{fileresourceid}'}}") + else: + try: + cursor.execute(SQL_INSERT_AUDIT, {"fid": fileresourceid}) + cursor.execute(SQL_DELETE_ORIGINAL, {"fid": fileresourceid}) + conn.commit() + log(f"Archived and deleted fileresourceid {fileresourceid}") + except Exception as e: + log(f"❌ Failed DB update for fileresourceid {fileresourceid}: {e}") + conn.rollback() + raise + + +def move_files(file_list, file_base_path, temp_file_path, folder, dry_run): + base_dir = os.path.join(file_base_path, folder) + dest_dir = os.path.join(temp_file_path, folder) + for src in file_list: + rel_path = os.path.relpath(src, base_dir) + dst = os.path.join(dest_dir, rel_path) + dst_dir = os.path.dirname(dst) + os.makedirs(dst_dir, exist_ok=True) + log(f"{'[DRY RUN] ' if dry_run else ''}Moving {src} -> {dst}") + if not dry_run: + shutil.move(src, dst) + + +def ensure_audit_table_exists(cursor, dry_run=False): + if dry_run: + log("[DRY RUN] Would execute:") + log(SQL_CREATE_TABLE_IF_NOT_EXIST) + else: + log("Ensuring 'fileresourcesaudit' table exists...") + cursor.execute(SQL_CREATE_TABLE_IF_NOT_EXIST) + log("'fileresourcesaudit' table ready.") + + +def remove_documents(file_base_path, temp_file_path, dry_run, cur, conn, summary): + log("Querying orphaned fileresource entries (documents)...") + cur.execute(SQL_FIND_ORPHANS_DOCUMENTS) + rows = cur.fetchall() + if not rows: + log("✅ No orphaned fileresource entries found (documents).") + return + log(f"Found {len(rows)} \"document\" orphaned entries.") + process_orphan_files(rows, file_base_path, temp_file_path, dry_run, cur, conn, "document", summary) + + +def remove_datavalues(file_base_path, temp_file_path, dry_run, cur, conn, summary): + log("Querying orphaned fileresource entries (data values)...") + cur.execute(SQL_FIND_DATA_VALUES_FILE_RESOURCES) + rows = cur.fetchall() + + datavalue_uids = get_all_datavalue_uids(cur) + event_blob = get_event_blob(cur) + tracker_uids = get_all_tracker_uids(cur) + + orphan_data_values = [] + for fileresourceid, uid, storagekey, name, created in rows: + if uid in datavalue_uids: + continue # Referenced in datavalue + if uid in event_blob: + continue # Referenced in event + if uid in tracker_uids: + continue # Referenced in a tracker attribute + orphan_data_values.append((fileresourceid, storagekey, name, created)) + + if not orphan_data_values: + log("✅ No fileResources entries found (data values).") + return + + log(f"Found {len(orphan_data_values)} \"data_value\" orphaned entries.") + process_orphan_files(orphan_data_values, file_base_path, temp_file_path, dry_run, cur, conn, "dataValue", summary) + + +def process_orphan_files(orphan_data_values, file_base_path, temp_file_path, dry_run, cur, conn, folder, summary): + count = 0 + for entry in orphan_data_values: + fileresourceid, storagekey = entry[0], entry[1] + name = entry[2] if len(entry) > 2 else "" + created = entry[3] if len(entry) > 3 else None + if not fileresourceid: + log(f"⚠️ Skipping row with empty/null fileresourceid: {fileresourceid}") + continue + matches = get_matching_files(storagekey, file_base_path, folder) + if not matches: + log(f"No files found for storage_key: {storagekey} (folder={folder})") + else: + move_files(matches, file_base_path, temp_file_path, folder, dry_run) + update_db(fileresourceid, cur, conn, dry_run) + rel_matches = [os.path.join(folder, os.path.relpath(m, os.path.join(file_base_path, folder))) for m in matches] + summary["items"].append({ + "id": fileresourceid, + "name": name, + "storagekey": storagekey, + "folder": folder, + "files": rel_matches, + "created": created.isoformat() if hasattr(created, "isoformat") else created, + "detection_date": datetime.utcnow().isoformat(), + "action": "would_move_and_delete" if dry_run else "moved_and_deleted", + "notified": False + }) + count += 1 + log(f"{count} file(s) {'would be moved' if dry_run else 'were successfully moved and deleted'}") + + +def emit_summary(summary): + mode = summary.get("mode", "TEST") + items = summary.get("items", []) + log(f"Summary ({mode}): {len(items)} fileresource entries processed.") + for item in items: + files = item.get("files") or [""] + log(f" - id={item.get('id')} name=\"{item.get('name')}\" storagekey={item.get('storagekey')} action={item.get('action')}") + for f in files: + log(f" file: {f}") + + +def run_cleanup(args): + if args.mode == "docker": + run_docker_cleanup(args) + return + + config = load_config(args.config) + required_keys = ["db_host", "db_port", "db_name", "db_user", "file_base_path", "temp_file_path"] + missing_keys = [k for k in required_keys if not config.get(k)] + if missing_keys: + log(f"❌ Missing required config keys: {', '.join(missing_keys)}") + sys.exit(1) + + db_password = os.environ.get("DB_PASSWORD_FG") + if not db_password: + log("❌ Missing required environment variable: DB_PASSWORD_FG") + sys.exit(1) + + db_url = f"postgresql://{config['db_user']}:{db_password}@{config['db_host']}:{config['db_port']}/{config['db_name']}" + file_base_path = config["file_base_path"] + temp_file_path = config["temp_file_path"] + + if not os.path.isdir(file_base_path) or not os.path.isdir(temp_file_path): + log("❌ Both 'file_base_path' and 'temp_file_path' must exist and be directories.") + log(f" file_base_path: {file_base_path} -> {'OK' if os.path.isdir(file_base_path) else 'INVALID'}") + log(f" temp_file_path: {temp_file_path} -> {'OK' if os.path.isdir(temp_file_path) else 'INVALID'}") + sys.exit(1) + + if args.force and args.test: + log("❌ Choose either --force or --test, not both.") + sys.exit(1) + + dry_run = not args.force + log(f"{'Running in TEST mode' if dry_run else 'Running in FORCE mode'}") + log(f"Connecting to database: {db_url}") + log(f"File base path: {file_base_path}") + log(f"Temporary move path: {temp_file_path}") + + summary = {"mode": "TEST" if dry_run else "FORCE", "items": []} + + with psycopg2.connect(dsn=db_url) as conn: + with conn.cursor() as cur: + ensure_audit_table_exists(cur, dry_run) + remove_documents(file_base_path, temp_file_path, dry_run, cur, conn, summary) + remove_datavalues(file_base_path, temp_file_path, dry_run, cur, conn, summary) + + if args.csv_path: + if getattr(args, "save_all_as_notified", False): + mark_items_notified(summary.get("items", [])) + else: + merge_notified_flags(args.csv_path, summary.get("items", []), unique_keys=("id", "uid"), log=log) + items = deduplicate_items(args.csv_path, summary.get("items", []), unique_keys=("id", "uid"), overwrite=False, log=log) + append_items(args.csv_path, items, overwrite=False, log=log) + emit_summary(summary) + + +def run_docker_cleanup(args): + if not args.docker_instance: + log("❌ --docker-instance is required when --mode=docker") + sys.exit(1) + script_path = os.path.join(os.path.dirname(__file__), "file_garbage_remover_docker.py") + if not os.path.isfile(script_path): + log(f"❌ Docker cleanup script not found at {script_path}") + sys.exit(1) + cmd = [sys.executable, script_path, "--instance", args.docker_instance] + if args.csv_path: + cmd.extend(["--csv-path", args.csv_path]) + if getattr(args, "save_all_as_notified", False): + cmd.append("--save-all-as-notified") + if args.force: + cmd.append("--force") + log(f"Running docker cleanup via: {' '.join(cmd)}") + try: + subprocess.run(cmd, check=True) + except subprocess.CalledProcessError as e: + log(f"❌ Docker cleanup failed: {e}") + sys.exit(e.returncode) diff --git a/DHIS2/file_garbage_remover/common.py b/DHIS2/file_garbage_remover/common.py new file mode 100644 index 00000000..8483d941 --- /dev/null +++ b/DHIS2/file_garbage_remover/common.py @@ -0,0 +1,81 @@ +import json +import os +import re +from datetime import datetime + + +def log(message): + """Simple logger with timestamp and DB URL password masking.""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + if isinstance(message, str): + message = re.sub( + r"(postgresql://[^:]+:)([^@]+)(@)", + r"\1*****\3", + message + ) + print(f"[{timestamp}] {message}") + + +def load_config(path): + with open(path, "r") as f: + return json.load(f) + + +def mark_items_notified(items): + for item in items: + item["notified"] = True + + +def emit_summary(items, mode, log_fn=print): + log_fn(f"Summary ({mode}): {len(items)} fileresource entries processed.") + for item in items: + files = item.get("files") or [""] + log_fn(f" - id={item.get('id')} name=\"{item.get('name')}\" storagekey={item.get('storagekey')} action={item.get('action')}") + for f in files: + log_fn(f" file: {f}") + + +def _iso(value): + return value.isoformat() if hasattr(value, "isoformat") else value + + +def build_document_items(raw_rows): + items = [] + for fid, uid, storagekey, name, created in raw_rows: + items.append({ + "id": fid, + "uid": uid, + "name": name, + "storagekey": storagekey, + "folder": "document", + "files": [], + "created": _iso(created), + "detection_date": datetime.utcnow().isoformat(), + "action": "would_move_and_delete", + "notified": False, + }) + return items + + +def build_datavalue_items(raw_rows, datavalue_uids, tracker_uids, event_blob): + items = [] + for fid, uid, storagekey, name, created in raw_rows: + if uid in datavalue_uids: + continue + if uid in event_blob: + continue + if uid in tracker_uids: + continue + items.append({ + "id": fid, + "uid": uid, + "name": name, + "storagekey": storagekey, + "folder": "dataValue", + "files": [], + "created": _iso(created), + "detection_date": datetime.utcnow().isoformat(), + "action": "would_move_and_delete", + "notified": False, + }) + return items diff --git a/DHIS2/file_garbage_remover/csv_utils.py b/DHIS2/file_garbage_remover/csv_utils.py new file mode 100644 index 00000000..a365c16e --- /dev/null +++ b/DHIS2/file_garbage_remover/csv_utils.py @@ -0,0 +1,112 @@ +import csv +import os + +DEFAULT_FIELDNAMES = ["id", "uid", "name", "created", "detection_date", "storagekey", "folder", "action", "files", "notified"] + + +def serialize_item(item): + return { + "id": item.get("id"), + "uid": item.get("uid"), + "name": item.get("name"), + "created": item.get("created"), + "detection_date": item.get("detection_date"), + "storagekey": item.get("storagekey"), + "folder": item.get("folder"), + "action": item.get("action"), + "files": "|".join(item.get("files") or []), + "notified": str(item.get("notified", False)).lower(), + } + + +def append_items(csv_path, items, overwrite=False, log=None): + """ + Append serialized items to CSV. If overwrite=True, start fresh. + """ + mode = "w" if overwrite else "a" + file_exists = os.path.isfile(csv_path) + want_header = overwrite or not file_exists + try: + with open(csv_path, mode, newline="") as f: + writer = csv.DictWriter(f, fieldnames=DEFAULT_FIELDNAMES) + if want_header: + writer.writeheader() + for item in items: + writer.writerow(serialize_item(item)) + if log: + log(f"CSV summary {'overwritten' if overwrite else 'appended'} at {csv_path}") + except Exception as e: + if log: + log(f"❌ Failed to write CSV {csv_path}: {e}") + else: + raise + + +def read_rows(csv_path): + with open(csv_path, newline="") as f: + reader = csv.DictReader(f) + fieldnames = reader.fieldnames or DEFAULT_FIELDNAMES + rows = list(reader) + return rows, fieldnames + + +def write_rows(csv_path, rows, fieldnames=None): + fieldnames = fieldnames or DEFAULT_FIELDNAMES + with open(csv_path, "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + + +def deduplicate_items(csv_path, new_items, unique_keys=("id", "uid"), overwrite=False, log=None): + """ + Returns new_items filtered to avoid duplicates with existing CSV based on unique_keys. + If overwrite=True, no filtering is applied (CSV will be rewritten). + """ + if overwrite or not os.path.isfile(csv_path): + return new_items + + existing_keys = set() + try: + rows, _ = read_rows(csv_path) + for row in rows: + key = tuple((row.get(k) or "").strip() for k in unique_keys) + existing_keys.add(key) + except Exception as e: + if log: + log(f"⚠️ Could not read existing CSV for deduplication: {e}") + + filtered = [] + for item in new_items: + key = tuple((str(item.get(k)) or "").strip() for k in unique_keys) + if key in existing_keys: + continue + existing_keys.add(key) + filtered.append(item) + return filtered + + +def merge_notified_flags(csv_path, items, unique_keys=("id", "uid"), log=None): + """ + For each item, if an existing CSV has the same unique key with notified=true, + copy that notified flag into the item (unless already true). + """ + if not os.path.isfile(csv_path): + return items + try: + rows, _ = read_rows(csv_path) + except Exception as e: + if log: + log(f"⚠️ Could not read existing CSV for notified merge: {e}") + return items + + notified_map = {} + for row in rows: + key = tuple((row.get(k) or "").strip() for k in unique_keys) + notified_map[key] = str(row.get("notified", "")).lower() == "true" + + for item in items: + key = tuple((str(item.get(k)) or "").strip() for k in unique_keys) + if notified_map.get(key): + item["notified"] = True + return items diff --git a/DHIS2/file_garbage_remover/file_garbage_remover_docker.py b/DHIS2/file_garbage_remover/file_garbage_remover_docker.py new file mode 100644 index 00000000..2d506e54 --- /dev/null +++ b/DHIS2/file_garbage_remover/file_garbage_remover_docker.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 + +import argparse +import os +import subprocess +import sys +import tempfile +from datetime import datetime + +from csv_utils import append_items, deduplicate_items, merge_notified_flags +from common import ( + build_datavalue_items, + build_document_items, + emit_summary, + mark_items_notified, +) +from sql_queries import ( + SQL_CREATE_TABLE_IF_NOT_EXIST, + SQL_DATA_VALUE_UIDS, + SQL_DELETE_ORIGINAL, + SQL_EVENT_FILE_UIDS, + SQL_FIND_DATA_VALUES_FILE_RESOURCES, + SQL_FIND_ORPHANS_DOCUMENTS, + SQL_INSERT_AUDIT, + SQL_TRACKER_ATTRIBUTE_UIDS, +) + + +def run(cmd, capture=False): + kwargs = {"check": True} + if capture: + kwargs["stdout"] = subprocess.PIPE + kwargs["stderr"] = subprocess.PIPE + kwargs["text"] = True + return subprocess.run(cmd, **kwargs) + + +def find_container_id(instance): + slug = "d2-docker-" + instance.replace("/", "-").replace(":", "-").replace(".", "-") + core = slug.replace("dhis2-data-", "") + "-core-1" + result = run(["docker", "ps", "--format", "{{.ID}}", "-f", f"name={core}"], capture=True) + lines = result.stdout.strip().splitlines() + if not lines: + return None + return lines[0] + + +def run_sql(instance, sql): + with tempfile.NamedTemporaryFile("w", delete=False) as f: + f.write(sql) + tmp_path = f.name + try: + result = run(["d2-docker", "run-sql", "-i", instance, tmp_path], capture=True) + return result.stdout + finally: + try: + os.remove(tmp_path) + except OSError: + pass + + +def parse_tab_rows(output, expected_cols): + rows = [] + for line in output.strip().splitlines(): + if not line or line.lower().startswith("fileresourceid") or line.lower().startswith("value") or line.lower().startswith("eventdatavalues"): + continue + if line.startswith("(") and "row" in line: + continue + parts = [p.strip() for p in line.split("|")] + if len(parts) < expected_cols: + continue + rows.append(parts) + return rows + + +def get_orphan_documents(instance): + out = run_sql(instance, SQL_FIND_ORPHANS_DOCUMENTS) + raw_rows = parse_tab_rows(out, 5) + return build_document_items(raw_rows) + + +def get_orphan_datavalues(instance): + data_rows = parse_tab_rows(run_sql(instance, SQL_FIND_DATA_VALUES_FILE_RESOURCES), 5) + datavalue_uids = set(r[0] for r in parse_tab_rows(run_sql(instance, SQL_DATA_VALUE_UIDS), 1)) + tracker_uids = set(r[0] for r in parse_tab_rows(run_sql(instance, SQL_TRACKER_ATTRIBUTE_UIDS), 1)) + event_blob = "\n".join(r[0] for r in parse_tab_rows(run_sql(instance, SQL_EVENT_FILE_UIDS), 1)) + return build_datavalue_items(data_rows, datavalue_uids, tracker_uids, event_blob) + + +def delete_files(container_id, row, dry_run): + base = "/DHIS2_home/files" + storagekey = row["storagekey"] + folder = row["folder"] + prefix = os.path.join(base, folder, storagekey.replace(f"{folder}/", "")) + cmd = ["docker", "exec", container_id, "bash", "-c", f"rm -v {prefix}*"] + if dry_run: + print(f"[DRY RUN] {' '.join(cmd)}") + else: + try: + run(cmd) + except subprocess.CalledProcessError as e: + print(f"⚠️ File deletion failed (likely missing): {' '.join(cmd)} -> {e}") + # Continue with DB cleanup and CSV logging even if file is already gone. + + +def ensure_audit_table(instance): + run_sql(instance, SQL_CREATE_TABLE_IF_NOT_EXIST) + + +def archive_and_delete(instance, fileresourceid): + fid = int(fileresourceid) + sql = (SQL_INSERT_AUDIT % {"fid": fid}) + (SQL_DELETE_ORIGINAL % {"fid": fid}) + run_sql(instance, sql) + + +def main(): + parser = argparse.ArgumentParser(description="Find/delete orphan file resources in d2-docker instance and log to CSV.") + parser.add_argument("--instance", required=True, help="d2-docker instance name (e.g. docker.eyeseetea.com/project/dhis2-data:2.41-test)") + parser.add_argument("--csv-path", help="CSV file to log orphan entries.") + parser.add_argument("--force", action="store_true", help="Actually delete files in container.") + parser.add_argument("--save-all-as-notified", action="store_true", help="Store CSV entries with notified=true even if not sent.") + args = parser.parse_args() + + container_id = find_container_id(args.instance) + if not container_id: + print("❌ Could not find core container for instance", file=sys.stderr) + sys.exit(1) + + dry_run = not args.force + print(f"Running in {'DRY-RUN' if dry_run else 'FORCE'} mode against container {container_id}") + + ensure_audit_table(args.instance) + + rows = [] + rows.extend(get_orphan_documents(args.instance)) + rows.extend(get_orphan_datavalues(args.instance)) + print(f"Found {len(rows)} orphan entries") + + if args.save_all_as_notified: + mark_items_notified(rows) + else: + if args.csv_path: + merge_notified_flags(args.csv_path, rows, unique_keys=("id", "uid")) + + for row in rows: + delete_files(container_id, row, dry_run) + if dry_run: + print(f"[DRY RUN] Skipping DB archive/delete for {row['id']} ({row.get('name', '')}) action={row.get('action')}") + continue + try: + archive_and_delete(args.instance, row["id"]) + row["action"] = "moved_and_deleted" + except Exception as e: + print(f"❌ Failed DB archive/delete for {row['id']}: {e}", file=sys.stderr) + if not dry_run: + sys.exit(1) + + if args.csv_path: + items = deduplicate_items(args.csv_path, rows, unique_keys=("id", "uid"), overwrite=False) + append_items(args.csv_path, items, overwrite=False) + + emit_summary(rows, "DRY-RUN" if dry_run else "FORCE", log_fn=print) + + +if __name__ == "__main__": + main() diff --git a/DHIS2/file_garbage_remover/file_garbage_remover_tomcat.py b/DHIS2/file_garbage_remover/file_garbage_remover_tomcat.py new file mode 100644 index 00000000..43e1d1d7 --- /dev/null +++ b/DHIS2/file_garbage_remover/file_garbage_remover_tomcat.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python3 +import argparse +import os +import shutil +import sys +from datetime import datetime + +import psycopg2 + +from common import ( + build_datavalue_items, + build_document_items, + emit_summary, + load_config, + log, + mark_items_notified, +) +from csv_utils import deduplicate_items, append_items +from sql_queries import ( + SQL_CREATE_TABLE_IF_NOT_EXIST, + SQL_DATA_VALUE_UIDS, + SQL_DELETE_ORIGINAL, + SQL_EVENT_FILE_UIDS, + SQL_FIND_DATA_VALUES_FILE_RESOURCES, + SQL_FIND_ORPHANS_DOCUMENTS, + SQL_INSERT_AUDIT, + SQL_TRACKER_ATTRIBUTE_UIDS, +) + + +def get_events(cursor): + cursor.execute(SQL_EVENT_FILE_UIDS) + return [row[0] for row in cursor.fetchall() if row[0]] + + +def get_event_blob(cursor): + event_texts = get_events(cursor) + return "\n".join(json.dumps(e) for e in event_texts) + + +def get_all_datavalue_uids(cursor): + cursor.execute(SQL_DATA_VALUE_UIDS) + return set(row[0] for row in cursor.fetchall() if row[0]) + + +def get_all_tracker_uids(cursor): + cursor.execute(SQL_TRACKER_ATTRIBUTE_UIDS) + return set(row[0] for row in cursor.fetchall() if row[0]) + + +def get_matching_files(storage_key, base_dir, folder): + # Folder (document or dataValue) + storage_key = storage_key.replace(folder + "/", "") + base_dir = os.path.join(base_dir, folder) + matching = [] + for filename in os.listdir(base_dir): + if filename.startswith(storage_key): + fullpath = os.path.join(base_dir, filename) + if os.path.isfile(fullpath): + matching.append(fullpath) + return matching + + +def update_db(fileresourceid, cursor, conn, dry_run=False): + if dry_run: + log("[DRY RUN] Would execute:") + log(f" {SQL_INSERT_AUDIT.strip()}") + log(f" with parameters: {{'fid': '{fileresourceid}'}}") + log(f" {SQL_DELETE_ORIGINAL.strip()} ") + log(f" with parameters: {{'fid': '{fileresourceid}'}}") + else: + try: + cursor.execute(SQL_INSERT_AUDIT, {"fid": fileresourceid}) + cursor.execute(SQL_DELETE_ORIGINAL, {"fid": fileresourceid}) + conn.commit() + log(f"Archived and deleted fileresourceid {fileresourceid}") + except Exception as e: + log(f"❌ Failed DB update for fileresourceid {fileresourceid}: {e}") + conn.rollback() + raise + + +def move_files(file_list, file_base_path, temp_file_path, folder, dry_run): + base_dir = os.path.join(file_base_path, folder) + dest_dir = os.path.join(temp_file_path, folder) + for src in file_list: + rel_path = os.path.relpath(src, base_dir) + log(rel_path) + dst = os.path.join(dest_dir, rel_path) + log(dst) + dst_dir = os.path.dirname(dst) + log(dst_dir) + try: + os.makedirs(dst_dir, exist_ok=True) + except Exception as e: + log(f"❌ Failed to create directory {dst_dir}: {e}") + raise + + log(f"{'[DRY RUN] ' if dry_run else ''}Moving {src} -> {dst}") + if not dry_run: + try: + shutil.move(src, dst) + except Exception as e: + log(f"❌ Failed to move {src}: {e}") + raise + + +def ensure_audit_table_exists(cursor, dry_run=False): + if dry_run: + log("[DRY RUN] Would execute:") + log(SQL_CREATE_TABLE_IF_NOT_EXIST) + else: + log("Ensuring 'fileresourcesaudit' table exists...") + cursor.execute(SQL_CREATE_TABLE_IF_NOT_EXIST) + log("'fileresourcesaudit' table ready.") + + +def main(): + parser = argparse.ArgumentParser(description="Move orphaned DHIS2 file resources and archive DB entries.") + parser.add_argument("--force", action="store_true", help="Apply changes: move files and modify DB.") + parser.add_argument("--test", action="store_true", help="Run in dry-run mode (no changes). Default if --force not provided.") + parser.add_argument("--config", required=True, help="Path to config.json file.") + parser.add_argument("--csv-path", help="Optional CSV file to record processed entries.") + parser.add_argument("--save-all-as-notified", action="store_true", help="Store CSV entries with notified=true even if not sent.") + args = parser.parse_args() + + config = load_config(args.config) + required_keys = ["db_host", "db_port", "db_name", "db_user", "file_base_path", "temp_file_path"] + missing_keys = [k for k in required_keys if not config.get(k)] + + if missing_keys: + log(f"❌ Missing required config keys: {', '.join(missing_keys)}") + sys.exit(1) + + db_password = os.environ.get("DB_PASSWORD_FG") + if not db_password: + log("❌ Missing required environment variable: DB_PASSWORD_FG") + sys.exit(1) + + db_url = f"postgresql://{config['db_user']}:{db_password}@{config['db_host']}:{config['db_port']}/{config['db_name']}" + file_base_path = config["file_base_path"] + temp_file_path = config["temp_file_path"] + + if not os.path.isdir(file_base_path) or not os.path.isdir(temp_file_path): + log("❌ Both 'file_base_path' and 'temp_file_path' must exist and be directories.") + log(f" file_base_path: {file_base_path} -> {'OK' if os.path.isdir(file_base_path) else 'INVALID'}") + log(f" temp_file_path: {temp_file_path} -> {'OK' if os.path.isdir(temp_file_path) else 'INVALID'}") + sys.exit(1) + + if args.force and args.test: + log("❌ Choose either --force or --test, not both.") + sys.exit(1) + + dry_run = not args.force # default to test mode unless --force is provided + log(f"{'Running in TEST mode' if dry_run else 'Running in FORCE mode'}") + log(f"Connecting to database: {db_url}") + log(f"File base path: {file_base_path}") + log(f"Temporary move path: {temp_file_path}") + + summary = {"mode": "TEST" if dry_run else "FORCE", "items": []} + + with psycopg2.connect(dsn=db_url) as conn: + with conn.cursor() as cur: + ensure_audit_table_exists(cur, dry_run) + remove_documents(file_base_path, temp_file_path, dry_run, cur, conn, summary) + remove_datavalues(file_base_path, temp_file_path, dry_run, cur, conn, summary) + + if args.csv_path: + if args.save_all_as_notified: + mark_items_notified(summary.get("items", [])) + else: + merge_notified_flags(args.csv_path, summary.get("items", []), unique_keys=("id", "uid"), log=log) + items = deduplicate_items(args.csv_path, summary.get("items", []), unique_keys=("id", "uid"), overwrite=False, log=log) + append_items(args.csv_path, items, overwrite=False, log=log) + emit_summary(summary.get("items", []), summary.get("mode", "TEST"), log_fn=log) + + +def remove_documents(file_base_path, temp_file_path, dry_run, cur, conn, summary): + log("Querying orphaned fileresource entries...") + cur.execute(SQL_FIND_ORPHANS_DOCUMENTS) + raw_rows = cur.fetchall() + items = build_document_items(raw_rows) + + if not items: + log("✅ No orphaned fileresource entries found.") + return + + log(f"Found {len(items)} \"document\" orphaned entries.") + process_orphan_items(items, file_base_path, temp_file_path, dry_run, cur, conn, summary) + + +def remove_datavalues(file_base_path, temp_file_path, dry_run, cur, conn, summary): + log("Querying orphaned fileresource entries...") + cur.execute(SQL_FIND_DATA_VALUES_FILE_RESOURCES) + raw_rows = cur.fetchall() + # get all file resource datavalues + datavalue_uids = get_all_datavalue_uids(cur) + # get all events with file dataelement values + event_blob = get_event_blob(cur) + + tracker_uids = get_all_tracker_uids(cur) + items = build_datavalue_items(raw_rows, datavalue_uids, tracker_uids, event_blob) + + if not items: + log("✅ No fileResources entries found.") + return + + log(f"Found {len(items)} \"data_value\" orphaned entries.") + process_orphan_items(items, file_base_path, temp_file_path, dry_run, cur, conn, summary) + + +def process_orphan_items(orphan_items, file_base_path, temp_file_path, dry_run, cur, conn, summary): + count = 0 + for item in orphan_items: + fileresourceid = item.get("id") + storagekey = item.get("storagekey", "") + name = item.get("name", "") + folder = item.get("folder", "") + if not fileresourceid: + log(f"⚠️ Skipping row with empty/null fileresourceid: {fileresourceid}") + continue + try: + matches = get_matching_files(storagekey, file_base_path, folder) + if not matches: + log(f"No files found for storage_key: {storagekey} (folder={folder})") + else: + move_files(matches, file_base_path, temp_file_path, folder, dry_run) + # Always update the db also if the files not existed on disk + update_db(fileresourceid, cur, conn, dry_run) + rel_matches = [os.path.join(folder, os.path.relpath(m, os.path.join(file_base_path, folder))) for m in matches] + item["files"] = rel_matches + item["action"] = "would_move_and_delete" if dry_run else "moved_and_deleted" + summary["items"].append(item) + except Exception as e: + log(f"❌ Aborting due to error with fileresourceid {fileresourceid}: {e}") + sys.exit(1) + count = count + 1 + log(f"{count} file(s) {'would be moved' if dry_run else 'were successfully moved and deleted'}") + + +if __name__ == "__main__": + main() diff --git a/DHIS2/file_garbage_remover/main.py b/DHIS2/file_garbage_remover/main.py new file mode 100644 index 00000000..64d6a35e --- /dev/null +++ b/DHIS2/file_garbage_remover/main.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +import argparse +import sys + +import cleaner +from notifier import mark_notified, pending_from_csv, send_notification + + +def build_parser(): + parser = argparse.ArgumentParser(description="DHIS2 file garbage remover and notifier.") + # Cleanup options + parser.add_argument("--force", action="store_true", help="Apply changes: move files and modify DB.") + parser.add_argument("--test", action="store_true", help="Run in dry-run mode (default unless --force).") + parser.add_argument("--config", help="Path to config.json file. Required unless --notify-only.") + parser.add_argument("--csv-path", help="CSV file to record processed entries or to read notifications from.") + parser.add_argument("--mode", choices=["tomcat", "docker"], default="tomcat", help="Cleanup mode: tomcat (default) or docker.") + parser.add_argument("--docker-instance", help="d2-docker instance name (required for --mode=docker).") + + # Notification options + parser.add_argument("--notify-only", action="store_true", help="Skip cleanup and only send notifications from CSV.") + parser.add_argument("--notify-webhook-url", help="Webhook URL to send the notification. If not provided, falls back to config.json key 'webhook-url'.") + parser.add_argument("--notify-title", help="Notification title (required if using --notify-webhook-url).") + parser.add_argument("--notify-http-proxy", help="HTTP proxy (optional).") + parser.add_argument("--notify-https-proxy", help="HTTPS proxy (optional).") + parser.add_argument("--notify-test", action="store_true", help="Dry-run notification: print payload instead of sending.") + parser.add_argument("--save-all-as-notified", action="store_true", help="Mark CSV entries as notified even if no notification is sent.") + parser.add_argument("--notify-max-lines", type=int, default=100, help="Maximum number of lines to include in notification content (default 100).") + + return parser + + +def resolve_webhook(config_path, cli_webhook): + return cli_webhook or _from_config(config_path, ["webhook-url", "webhook_url"]) + + +def _from_config(config_path, keys): + if not config_path: + return None + try: + cfg = cleaner.load_config(config_path) + except Exception as e: + print(f"❌ Failed to load config file: {e}", file=sys.stderr) + sys.exit(1) + for key in keys: + if key in cfg: + return cfg.get(key) + return None + + +def resolve_proxy(config_path, cli_value, keys): + return cli_value or _from_config(config_path, keys) + + +def run_notify_flow(csv_path, config_path=None, webhook_url=None, title=None, http_proxy=None, https_proxy=None, notify_test=False, notify_max_lines=100): + names, ids, fieldnames, rows = pending_from_csv(csv_path) + if not names: + return + max_lines = notify_max_lines if notify_max_lines and notify_max_lines > 0 else None + truncated = names[:max_lines] if max_lines else names + content = "\n".join(truncated) + if max_lines and len(names) > max_lines: + content += f"\n... (truncated {len(names) - max_lines} more)" + webhook = resolve_webhook(config_path, webhook_url) + http_proxy = resolve_proxy(config_path, http_proxy, ["notify-http-proxy", "notify_http_proxy", "http-proxy", "http_proxy"]) + https_proxy = resolve_proxy(config_path, https_proxy, ["notify-https-proxy", "notify_https_proxy", "https-proxy", "https_proxy"]) + if notify_test: + print(f"[TEST] Would send notification to {webhook or ''}") + print(f"[TEST] Title: {title or ''}") + print(f"[TEST] Content:\n{content}") + if webhook: + if not title: + print("❌ --notify-title is required when using --notify-webhook-url", file=sys.stderr) + sys.exit(1) + if not notify_test: + try: + send_notification( + webhook_url=webhook, + title=title, + content=content, + http_proxy=http_proxy, + https_proxy=https_proxy, + ) + except Exception as e: + print(f"❌ Failed to send notification: {e}", file=sys.stderr) + sys.exit(1) + # mark entries as notified + if ids: + mark_notified(csv_path, rows, fieldnames, ids) + + +def main(): + parser = build_parser() + args = parser.parse_args() + + # Notify-only path + if args.notify_only: + if not args.csv_path: + print("❌ --csv-path is required for notify-only mode", file=sys.stderr) + sys.exit(1) + run_notify_flow( + csv_path=args.csv_path, + config_path=args.config, + webhook_url=args.notify_webhook_url, + title=args.notify_title, + http_proxy=args.notify_http_proxy, + https_proxy=args.notify_https_proxy, + notify_test=args.notify_test, + notify_max_lines=args.notify_max_lines, + ) + sys.exit(0) + + # Cleanup path (default) + if not args.config: + print("❌ --config is required to run cleanup", file=sys.stderr) + sys.exit(1) + cleaner.run_cleanup(args) + + wants_notify = any([ + args.notify_webhook_url, + args.notify_title, + args.notify_http_proxy, + args.notify_https_proxy, + args.notify_test, + ]) + if args.save_all_as_notified: + return #nothing to report + elif wants_notify: + if not args.csv_path: + print("❌ --csv-path is required to send notifications after cleanup", file=sys.stderr) + sys.exit(1) + run_notify_flow( + csv_path=args.csv_path, + config_path=args.config, + webhook_url=args.notify_webhook_url, + title=args.notify_title, + http_proxy=args.notify_http_proxy, + https_proxy=args.notify_https_proxy, + notify_test=args.notify_test, + notify_max_lines=args.notify_max_lines, + ) + + +if __name__ == "__main__": + main() diff --git a/DHIS2/file_garbage_remover/notifier.py b/DHIS2/file_garbage_remover/notifier.py new file mode 100644 index 00000000..870f0f52 --- /dev/null +++ b/DHIS2/file_garbage_remover/notifier.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +import os +import sys +import requests + +from csv_utils import read_rows, write_rows + + +def pending_from_csv(csv_path): + rows, fieldnames = read_rows(csv_path) + + notify_ids = set() + names_to_print = [] + for row in rows: + notified = str(row.get("notified", "")).lower() == "true" + if notified: + continue + name = row.get("name", "") + if name: + names_to_print.append(name) + notify_ids.add(str(row.get("id"))) + + return names_to_print, notify_ids, fieldnames, rows + + +def mark_notified(csv_path, rows, fieldnames, notify_ids): + if not notify_ids: + return + for row in rows: + if str(row.get("id")) in notify_ids: + row["notified"] = "true" + write_rows(csv_path, rows, fieldnames=fieldnames) + + +def send_notification(webhook_url, title, content, http_proxy=None, https_proxy=None): + proxies = { + "http": http_proxy or os.getenv("http_proxy", ""), + "https": https_proxy or os.getenv("https_proxy", "") + } + payload = {"text": f"**{title}**\n{content}"} + response = requests.post( + webhook_url, + json=payload, + headers={"Content-Type": "application/json"}, + proxies=proxies, + verify=True, + timeout=10, + ) + response.raise_for_status() + return response.status_code + + +def main(): + if len(sys.argv) < 2: + sys.exit(1) + csv_path = sys.argv[1] + days = int(sys.argv[2]) if len(sys.argv) > 2 else None + names, ids, fieldnames, rows = pending_from_csv(csv_path, days) + if not names: + sys.exit(1) + for name in names: + print(name) + mark_notified(csv_path, rows, fieldnames, ids) + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/DHIS2/file_garbage_remover/sql_queries.py b/DHIS2/file_garbage_remover/sql_queries.py new file mode 100644 index 00000000..5eadcba1 --- /dev/null +++ b/DHIS2/file_garbage_remover/sql_queries.py @@ -0,0 +1,49 @@ +SQL_FIND_ORPHANS_DOCUMENTS = """ + SELECT fileresourceid, uid, storagekey, name, created + FROM fileresource fr + WHERE NOT EXISTS ( + SELECT 1 FROM document d WHERE d.fileresource = fr.fileresourceid + ) + AND fr.uid NOT IN ( + SELECT url FROM document + ) + AND fr.domain = 'DOCUMENT' and fr.storagekey like '%document%' + AND COALESCE(fr.lastupdated, fr.created, NOW()) < (NOW() - INTERVAL '24 hours'); +""" + +SQL_FIND_DATA_VALUES_FILE_RESOURCES = """ + SELECT fileresourceid, uid, storagekey, name, created + FROM fileresource fr + WHERE fr.domain = 'DATA_VALUE' and fr.storagekey like '%dataValue%' + AND COALESCE(fr.lastupdated, fr.created, NOW()) < (NOW() - INTERVAL '24 hours'); +""" + +SQL_INSERT_AUDIT = """ + INSERT INTO fileresourcesaudit SELECT * FROM fileresource WHERE fileresourceid = %(fid)s; +""" + +SQL_DELETE_ORIGINAL = """ + DELETE FROM fileresource WHERE fileresourceid = %(fid)s; +""" + +SQL_CREATE_TABLE_IF_NOT_EXIST = """ + CREATE TABLE IF NOT EXISTS fileresourcesaudit AS TABLE fileresource WITH NO DATA; +""" + +SQL_EVENT_FILE_UIDS = """ + SELECT eventdatavalues + FROM event + WHERE programstageid + IN (SELECT programstageid FROM programstagedataelement WHERE dataelementid + IN (SELECT dataelementid FROM dataelement WHERE valuetype='FILE_RESOURCE' or valuetype='IMAGE')) and deleted='f'; +""" + +SQL_DATA_VALUE_UIDS = """ + SELECT dv.value + FROM datavalue dv + WHERE dv.value IN (SELECT uid FROM fileresource WHERE domain='DATA_VALUE') +""" + +SQL_TRACKER_ATTRIBUTE_UIDS = """ +select value from trackedentityattributevalue where value IN (SELECT uid FROM fileresource WHERE domain='DATA_VALUE'); +"""