diff --git a/airflow/dags/run_ogc_process.py b/airflow/dags/run_ogc_process.py new file mode 100644 index 00000000..cc54f19d --- /dev/null +++ b/airflow/dags/run_ogc_process.py @@ -0,0 +1,314 @@ +""" +DAG with custom SPSOGCOperator that subclasses KubernetesPodOperator +for OGC process execution with SPS-specific functionality. +""" + +import json +import logging +import re +from datetime import datetime + +import requests +from airflow.models.baseoperator import chain +from airflow.models.dag import DAG +from airflow.models.param import Param +from airflow.operators.python import PythonOperator, get_current_context +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.secret import Secret as AirflowK8sSecret +from airflow.utils.trigger_rule import TriggerRule +from kubernetes.client import models as k8s +from unity_sps_utils import POD_LABEL, POD_NAMESPACE, get_affinity + +PROCESSES_ENDPOINT = "https://api.dit.maap-project.org/api/ogc/processes" + + +def fetch_ogc_processes(): + """Fetch available processes from the OGC API and create mapping.""" + try: + response = requests.get(PROCESSES_ENDPOINT, timeout=30) + response.raise_for_status() + + processes_data = response.json() + process_mapping = {} + dropdown_options = [] + + for process in processes_data.get("processes", []): + process_id = process.get("id") + process_version = process.get("version") + + # Extract numerical ID from links + numerical_id = None + for link in process.get("links", []): + if link.get("rel") == "self": + href = link.get("href", "") + # Extract number from href like "/ogc/processes/7" + match = re.search(r"/processes/(\d+)$", href) + if match: + numerical_id = int(match.group(1)) + break + + if process_id and numerical_id: + display_name = f"{process_id}:{process_version}" if process_version else process_id + dropdown_options.append(display_name) + process_mapping[display_name] = numerical_id + + return process_mapping, dropdown_options + + except requests.RequestException as e: + logging.error(f"Failed to fetch processes: {e}") + # Return fallback mapping + return {"example-process:1.0": 1}, ["example-process:1.0"] + except Exception as e: + logging.error(f"Error processing OGC processes: {e}") + return {"example-process:1.0": 1}, ["example-process:1.0"] + + +# Constants +K8S_SECRET_NAME = "sps-app-credentials" +DOCKER_IMAGE = "jplmdps/ogc-job-runner:latest" +PROCESS_MAPPING, DROPDOWN_OPTIONS = fetch_ogc_processes() + +# SPS-specific secrets +secret_env_vars = [ + AirflowK8sSecret( + deploy_type="env", + deploy_target="MAAP_PGT", + secret=K8S_SECRET_NAME, + key="MAAP_PGT", + ) +] + + +class SPSOGCOperator(KubernetesPodOperator): + """ + Custom operator for SPS OGC process execution that subclasses KubernetesPodOperator. + + This operator encapsulates all SPS-specific configuration and provides a clean + interface for OGC process submission and monitoring. + """ + + def __init__( + self, + operation_type: str, + selected_process: str = None, + job_inputs: str = None, + job_queue: str = None, + job_id: str = None, + **kwargs, + ): + """ + Initialize the SPSOGCOperator. + + Args: + operation_type: Either "submit" or "monitor" + selected_process: Process selection for submit operations + job_inputs: JSON string of job inputs for submit operations + job_queue: Queue name for submit operations + job_id: Job ID for monitor operations + """ + self.operation_type = operation_type + self.selected_process = selected_process + self.job_inputs = job_inputs + self.job_queue = job_queue + self.job_id = job_id + + # Set SPS-specific defaults + kwargs.setdefault("namespace", POD_NAMESPACE) + kwargs.setdefault("image", DOCKER_IMAGE) + kwargs.setdefault("service_account_name", "airflow-worker") + kwargs.setdefault("secrets", secret_env_vars) + kwargs.setdefault("in_cluster", True) + kwargs.setdefault("get_logs", True) + kwargs.setdefault("startup_timeout_seconds", 600) + kwargs.setdefault("container_security_context", {"privileged": True}) + kwargs.setdefault("container_logs", True) + kwargs.setdefault("labels", {"pod": POD_LABEL}) + kwargs.setdefault("annotations", {"karpenter.sh/do-not-disrupt": "true"}) + kwargs.setdefault( + "affinity", + get_affinity( + capacity_type=["spot"], + anti_affinity_label=POD_LABEL, + ), + ) + kwargs.setdefault("on_finish_action", "keep_pod") + kwargs.setdefault("is_delete_operator_pod", False) + + # Build operation-specific environment variables + if operation_type == "submit": + kwargs["env_vars"] = self._build_submit_env_vars() + kwargs["name"] = f"ogc-submit-pod-{kwargs.get('task_id', 'unknown')}" + kwargs.setdefault("do_xcom_push", True) # Submit tasks need to return job ID + elif operation_type == "monitor": + kwargs["env_vars"] = self._build_monitor_env_vars() + kwargs["name"] = f"ogc-monitor-pod-{kwargs.get('task_id', 'unknown')}" + else: + raise ValueError(f"Invalid operation_type: {operation_type}. Must be 'submit' or 'monitor'") + + super().__init__(**kwargs) + + def _build_submit_env_vars(self): + """Build environment variables for job submission.""" + # Resolve numerical process ID from selected process + numerical_process_id = self._resolve_process_id() + + return [ + k8s.V1EnvVar( + name="SUBMIT_JOB_URL", + value="https://api.dit.maap-project.org/api/ogc/processes/{process_id}/execution", + ), + k8s.V1EnvVar(name="PROCESS_ID", value=str(numerical_process_id)), + k8s.V1EnvVar(name="JOB_INPUTS", value=self.job_inputs or "{}"), + k8s.V1EnvVar(name="QUEUE", value=self.job_queue or "maap-dps-worker-cardamom"), + k8s.V1EnvVar(name="SUBMIT_JOB", value="true"), + ] + + def _build_monitor_env_vars(self): + """Build environment variables for job monitoring.""" + return [ + k8s.V1EnvVar( + name="MONITOR_JOB_URL", + value="https://api.dit.maap-project.org/api/ogc/jobs/{job_id}", + ), + k8s.V1EnvVar(name="JOB_ID", value=self.job_id), + k8s.V1EnvVar(name="SUBMIT_JOB", value="false"), + ] + + def _resolve_process_id(self): + """Resolve the selected process to a numerical process ID.""" + if not self.selected_process: + raise ValueError("selected_process is required for submit operations") + + # Handle templated values - they won't be resolved yet during __init__ + if "{{" in str(self.selected_process): + # Return a template that will be resolved at runtime + return "{{ ti.xcom_pull(task_ids='Setup', key='return_value')['numerical_process_id'] }}" + + # Direct lookup for non-templated values + numerical_id = PROCESS_MAPPING.get(self.selected_process) + if numerical_id is None: + self.log.warning(f"Process '{self.selected_process}' not found in mapping, defaulting to ID 1") + return 1 + + return numerical_id + + def execute(self, context): + """Execute the operator with additional SPS-specific logging.""" + self.log.info(f"Starting SPS OGC {self.operation_type} operation") + + if self.operation_type == "submit": + self.log.info(f"Selected process: {self.selected_process}") + self.log.info(f"Job queue: {self.job_queue}") + self.log.info(f"Job inputs: {self.job_inputs}") + elif self.operation_type == "monitor": + self.log.info(f"Monitoring job ID: {self.job_id}") + + # Call parent execute method + result = super().execute(context) + + self.log.info(f"SPS OGC {self.operation_type} operation completed") + return result + + +dag_default_args = { + "owner": "unity-sps", + "depends_on_past": False, + "start_date": datetime.utcfromtimestamp(0), +} + +# --- DAG Definition --- + +dag = DAG( + dag_id="run_ogc_process", + description="Submits a job to an OGC process and monitors (using custom SPSOGCOperator)", + dag_display_name="Run an OGC Process (Custom Operator from KubernetesPodOperator)", + tags=["ogc", "job", "custom-operator"], + is_paused_upon_creation=False, + catchup=False, + schedule=None, + max_active_runs=10, + default_args=dag_default_args, + params={ + "selected_process": Param( + default=DROPDOWN_OPTIONS[0] if DROPDOWN_OPTIONS else "Error loading dropdown", + enum=DROPDOWN_OPTIONS, + title="Process Selection", + description="Select a process to execute.", + ), + "queue": Param( + "maap-dps-worker-cardamom", + type="string", + title="Queue", + description="The MAAP queue to submit the job to", + ), + "job_inputs": Param( + "{}", + type="string", + title="Job Inputs", + description="A JSON string representing the inputs payload for the job.", + ), + }, +) + +# --- Task Definitions --- + + +def setup(ti=None, **context): + """Task that logs DAG parameters and process mapping information.""" + + logging.info("Starting OGC job submission and monitoring DAG (Custom Operator Version).") + logging.info(f"Parameters received: {context['params']}") + logging.info(f"Available processes: {len(DROPDOWN_OPTIONS)}") + logging.info(f"Process mapping: {json.dumps(PROCESS_MAPPING, indent=2)}") + + context = get_current_context() + logging.info(f"DAG Run parameters: {json.dumps(context['params'], sort_keys=True, indent=4)}") + + selected_process = context["params"].get("selected_process") + if selected_process in PROCESS_MAPPING: + numerical_id = PROCESS_MAPPING[selected_process] + logging.info(f"Selected process '{selected_process}' maps to numerical ID: {numerical_id}") + return {"numerical_process_id": numerical_id} + else: + logging.warning(f"Selected process '{selected_process}' not found in mapping") + return {"numerical_process_id": 1} + + +setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag) + +submit_job_task = SPSOGCOperator( + task_id="submit_job_task", + operation_type="submit", + selected_process="{{ params.selected_process }}", + job_inputs="{{ params.job_inputs }}", + job_queue="{{ params.queue }}", + dag=dag, +) + +monitor_job_task = SPSOGCOperator( + task_id="monitor_job_task", + operation_type="monitor", + job_id="{{ ti.xcom_pull(task_ids='submit_job_task', key='return_value')['job_id'] }}", + dag=dag, +) + + +def cleanup(**context): + """A placeholder cleanup task""" + logging.info("Cleanup executed.") + + # Log final results if available + submit_result = context["ti"].xcom_pull(task_ids="submit_job_task", key="return_value") + monitor_result = context["ti"].xcom_pull(task_ids="monitor_job_task", key="return_value") + + if submit_result: + logging.info(f"Job submission result: {submit_result}") + if monitor_result: + logging.info(f"Job monitoring result: {monitor_result}") + + +cleanup_task = PythonOperator( + task_id="Cleanup", python_callable=cleanup, dag=dag, trigger_rule=TriggerRule.ALL_DONE +) + +chain(setup_task, submit_job_task, monitor_job_task, cleanup_task) diff --git a/airflow/docker/run_ogc_process/Dockerfile b/airflow/docker/run_ogc_process/Dockerfile new file mode 100644 index 00000000..2151ee8e --- /dev/null +++ b/airflow/docker/run_ogc_process/Dockerfile @@ -0,0 +1,8 @@ +FROM alpine:3.18 + +RUN apk add --no-cache curl jq + +COPY run_ogc_process_entrypoint.sh /usr/share/ogc/run_ogc_process_entrypoint.sh +WORKDIR /usr/share/ogc +RUN chmod +x /usr/share/ogc/run_ogc_process_entrypoint.sh +ENTRYPOINT ["/usr/share/ogc/run_ogc_process_entrypoint.sh"] diff --git a/airflow/docker/run_ogc_process/run_ogc_process_entrypoint.sh b/airflow/docker/run_ogc_process/run_ogc_process_entrypoint.sh new file mode 100644 index 00000000..590e239e --- /dev/null +++ b/airflow/docker/run_ogc_process/run_ogc_process_entrypoint.sh @@ -0,0 +1,70 @@ +#!/bin/sh + +set -e + +if [ "$SUBMIT_JOB" = "true" ] || [ "$SUBMIT_JOB" = "True" ]; then + echo "Submitting job" + + SUBMIT_JOB_URL=$(echo "$SUBMIT_JOB_URL" | sed "s/{process_id}/$PROCESS_ID/") + SUBMIT_JOB_ARGUMENTS=$(jq -n \ + --arg queue "$QUEUE" \ + --argjson inputs "$JOB_INPUTS" \ + '{queue: $queue, inputs: $inputs}') + + echo "Submitting the job to ${SUBMIT_JOB_URL}" + + response=$(curl --location ${SUBMIT_JOB_URL} \ + --header "proxy-ticket: ${MAAP_PGT}" \ + --header "Content-Type: application/json" \ + --data "${SUBMIT_JOB_ARGUMENTS}") + + echo "API Response: $response" + job_id=$(echo "$response" | jq -r .id) + + if [ "$job_id" = "null" ] || [ -z "$job_id" ]; then + echo "Failed to get jobID from response." + exit 1 + fi + + echo "Job submitted successfully. Job ID: ${job_id}" + + # Write the job_id to the XCom return file for the next task + mkdir -p /airflow/xcom/ + printf '{"job_id": "%s"}' "$job_id" > /airflow/xcom/return.json +elif [ "$SUBMIT_JOB" = "false" ] || [ "$SUBMIT_JOB" = "False" ]; then + echo "Monitoring job status" + + MONITOR_JOB_URL=$(echo "$MONITOR_JOB_URL" | sed "s/{job_id}/$JOB_ID/") + + TIMEOUT=3600 + POLL_INTERVAL=30 + SECONDS=0 + + while [ $SECONDS -lt $TIMEOUT ]; do + echo "Checking status..." + response=$(curl --location ${MONITOR_JOB_URL} \ + --header "proxy-ticket: ${MAAP_PGT}" \ + --header "Content-Type: application/json") + + status=$(echo "$response" | jq -r .status) + + echo "Current status is: $status" + + if [ "$status" = "successful" ]; then + echo "Job completed successfully!" + exit 0 + elif [ "$status" = "failed" ]; then + echo "Job failed!" + echo "Error details: $(echo "$response" | jq .)" + exit 1 + fi + + sleep $POLL_INTERVAL + SECONDS=$((SECONDS + POLL_INTERVAL)) + done + + echo "Job monitoring timed out after $TIMEOUT seconds." + exit 1 +else + echo "SUBMIT_JOB variable must be specified and set to true or false" +fi diff --git a/ogc-application-packages/run_ogc_process.json b/ogc-application-packages/run_ogc_process.json new file mode 100644 index 00000000..d196e108 --- /dev/null +++ b/ogc-application-packages/run_ogc_process.json @@ -0,0 +1,53 @@ +{ + "executionUnit": { + "image": "jplmdps/ogc-job-runner:latest", + "type": "docker" + }, + "processDescription": { + "description": "Submits a job to an OGC process and monitors (using custom SPSOGCOperator)", + "id": "run_ogc_process", + "inputs": { + "job_inputs": { + "description": "A JSON string representing the inputs payload for the job.", + "maxOccurs": 1, + "minOccurs": 1, + "schema": { + "type": "string" + }, + "title": "Job Inputs" + }, + "queue": { + "description": "The MAAP queue to submit the job to", + "maxOccurs": 1, + "minOccurs": 1, + "schema": { + "type": "string" + }, + "title": "Queue" + }, + "selected_process": { + "description": "Select a process to execute.", + "maxOccurs": 1, + "minOccurs": 1, + "schema": { + "type": "string" + }, + "title": "Process Selection" + } + }, + "jobControlOptions": [ + "async-execute" + ], + "outputs": { + "result": { + "description": "The result of the OGC process execution", + "schema": { + "$ref": "some-ref" + }, + "title": "Process Result" + } + }, + "title": "Run an OGC Process (Custom Operator from KubernetesPodOperator)", + "version": "1.0.0" + } +} diff --git a/terraform-unity/main.tf b/terraform-unity/main.tf index 4b72d525..fb4bdb9f 100644 --- a/terraform-unity/main.tf +++ b/terraform-unity/main.tf @@ -29,6 +29,11 @@ data "aws_ssm_parameter" "dockstore_token" { with_decryption = true } +data "aws_ssm_parameter" "maap_pgt" { + name = "/unity/ads/ogc/development/maap_pgt_mdps_dev_acct" + with_decryption = true +} + resource "kubernetes_secret" "sps-app-credentials" { metadata { name = "sps-app-credentials" @@ -39,6 +44,7 @@ resource "kubernetes_secret" "sps-app-credentials" { "DOCKERHUB_USERNAME" = data.aws_ssm_parameter.dockerhub_username.value "DOCKERHUB_TOKEN" = data.aws_ssm_parameter.dockerhub_api_key.value "DOCKSTORE_TOKEN" = data.aws_ssm_parameter.dockstore_token.value + "MAAP_PGT" = data.aws_ssm_parameter.maap_pgt.value } type = "Opaque" diff --git a/utils/post_deployment.sh b/utils/post_deployment.sh index 2adfe270..d8e27641 100755 --- a/utils/post_deployment.sh +++ b/utils/post_deployment.sh @@ -19,7 +19,7 @@ export TOKEN="$(python cognito-token-fetch.py -u $UNITY_USERNAME -c $UNITY_CLIE echo $TOKEN # list of processes to be registered -declare -a procs=("cwl_dag.json" "karpenter_test.json" "appgen_dag.json" "cwl_dag_modular.json" "db_cleanup_dag.json") +declare -a procs=("cwl_dag.json" "karpenter_test.json" "appgen_dag.json" "cwl_dag_modular.json" "db_cleanup_dag.json" "run_ogc_process.json") for proc in "${procs[@]}" do diff --git a/utils/post_deployment_terraform.sh b/utils/post_deployment_terraform.sh index 281e9cbd..57fc08fc 100755 --- a/utils/post_deployment_terraform.sh +++ b/utils/post_deployment_terraform.sh @@ -31,7 +31,7 @@ token=$(echo $token_response | jq -r '.AuthenticationResult.AccessToken') echo "Cognito token retrieved." # list of processes to be registered -declare -a procs=("cwl_dag.json" "karpenter_test.json" "appgen_dag.json" "cwl_dag_modular.json" "db_cleanup_dag.json") +declare -a procs=("cwl_dag.json" "karpenter_test.json" "appgen_dag.json" "cwl_dag_modular.json" "db_cleanup_dag.json" "run_ogc_process.json") for proc in "${procs[@]}" do