From 981653843cbd7d272486d2aea195b37b6d1119e0 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Sat, 26 Jul 2025 04:29:08 -0600 Subject: [PATCH 1/8] Adding selection of "karpenter-test" OGC process --- unity-test/conftest.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/unity-test/conftest.py b/unity-test/conftest.py index b01e175e..936a9165 100644 --- a/unity-test/conftest.py +++ b/unity-test/conftest.py @@ -28,8 +28,9 @@ def pytest_addoption(parser): "--venue", action="store", default=None, - choices=("dev", "test", "ops"), - help="The venue in which the cluster will be deployed (dev, test, ops).", + # Note: unity-py uses "prod" but sps software uses "ops" + choices=("dev", "test", "ops", "prod"), + help="The venue in which the cluster will be deployed (dev, test, ops, prod).", ) parser.addoption( "--developer", @@ -176,3 +177,15 @@ def cwl_dag_modular_process(ogc_processes): if p.id == "cwl_dag_modular": return p return None + + +@pytest.fixture(scope="session") +def karpenter_dag_process(ogc_processes): + """ + Selects the Karpenter Test DAG from the list of available OGC processes + """ + + for p in ogc_processes: + if p.id == "karpenter_test": + return p + return None From 433516037af1d8a78967cc2ac6e091e0a24d8a9a Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Sat, 26 Jul 2025 04:36:09 -0600 Subject: [PATCH 2/8] Adding Karpenter Test to the test features --- .../cwl_workflows_with_airflow_api.feature | 17 ++++++++--------- .../features/cwl_workflows_with_ogc_api.feature | 17 ++++++++--------- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/unity-test/system/integration/features/cwl_workflows_with_airflow_api.feature b/unity-test/system/integration/features/cwl_workflows_with_airflow_api.feature index 91f5234c..67da27e1 100644 --- a/unity-test/system/integration/features/cwl_workflows_with_airflow_api.feature +++ b/unity-test/system/integration/features/cwl_workflows_with_airflow_api.feature @@ -1,11 +1,11 @@ -Feature: Execute CWL workflows using the Airflow API +Feature: Execute DAG workflows using the Airflow API As a UNITY SPS user - I want to execute a CWL workflow using the Airflow API + I want to execute a DAG workflow using the Airflow API And verify that it completes successfully So that I can inspect the results - Scenario Outline: Successful execution of a CWL workflow with the Airflow API + Scenario Outline: Successful execution of a DAG workflow with the Airflow API Given the Airflow API is up and running When I trigger a dag run for the workflow using the DAG Then I receive a response with status code 200 @@ -13,9 +13,8 @@ Feature: Execute CWL workflows using the Airflow API Examples: | test_case | test_dag | - | EMIT | cwl_dag | -# | SBG_E2E_SCALE | cwl_dag | - | SBG_PREPROCESS | cwl_dag | - | EMIT | cwl_dag_modular | - | SBG_PREPROCESS | cwl_dag_modular | -# | SBG_ISOFIT | cwl_dag_modular | + | KARPENTER | karpenter_test | +# | EMIT | cwl_dag | +# | SBG_PREPROCESS | cwl_dag | +# | EMIT | cwl_dag_modular | +# | SBG_PREPROCESS | cwl_dag_modular | diff --git a/unity-test/system/integration/features/cwl_workflows_with_ogc_api.feature b/unity-test/system/integration/features/cwl_workflows_with_ogc_api.feature index ec0d151c..7ed760f1 100644 --- a/unity-test/system/integration/features/cwl_workflows_with_ogc_api.feature +++ b/unity-test/system/integration/features/cwl_workflows_with_ogc_api.feature @@ -1,11 +1,11 @@ -Feature: Execute CWL workflows using the OGC API +Feature: Execute DAG workflows using the OGC API As a UNITY SPS user - I want to execute a CWL workflow using the OGC API + I want to execute a DAG workflow using the OGC API And verify that it completes successfully So that I can inspect the results - Scenario Outline: Successful execution of a CWL workflow with the OGC API + Scenario Outline: Successful execution of a DAG workflow with the OGC API Given the OGC API is up and running When I trigger a OGC job for the OGC process Then the job starts executing @@ -13,9 +13,8 @@ Feature: Execute CWL workflows using the OGC API Examples: | test_case | test_dag | - | EMIT | cwl_dag | -# | SBG_E2E_SCALE | cwl_dag | - | SBG_PREPROCESS | cwl_dag | - | EMIT | cwl_dag_modular | - | SBG_PREPROCESS | cwl_dag_modular | -# | SBG_ISOFIT | cwl_dag_modular | + | KARPENTER | karpenter_test | +# | EMIT | cwl_dag | +# | SBG_PREPROCESS | cwl_dag | +# | EMIT | cwl_dag_modular | +# | SBG_PREPROCESS | cwl_dag_modular | From 11f56038b879f7505b0f4c6ef471ba2c64b937f0 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Sat, 26 Jul 2025 04:40:41 -0600 Subject: [PATCH 3/8] Adding Karpenter Test to the tests --- .../test_cwl_workflows_with_airflow_api.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/unity-test/system/integration/step_defs/test_cwl_workflows_with_airflow_api.py b/unity-test/system/integration/step_defs/test_cwl_workflows_with_airflow_api.py index 27eba66c..cf3376fc 100644 --- a/unity-test/system/integration/step_defs/test_cwl_workflows_with_airflow_api.py +++ b/unity-test/system/integration/step_defs/test_cwl_workflows_with_airflow_api.py @@ -1,4 +1,4 @@ -# This test executes the specified CWL workflow +# This test executes the specified DAG workflow # using the CWL DAG (classic or modular) submitted through the Airflow API. # The workflow parameters are contained in a YAML file which is venue-dependent. # The CWL DAGs (classic and modular) must already be deployed in Airflow, @@ -19,7 +19,9 @@ # DAG parameters are venue specific CWL_DAG_ID = "cwl_dag" CWL_DAG_MODULAR_ID = "cwl_dag_modular" +KARPENTER_DAG_ID = "karpenter_test" DAG_PARAMETERS = { + KARPENTER_DAG_ID: {"KARPENTER": {"placeholder": 1}}, CWL_DAG_MODULAR_ID: { "EMIT": { "stac_json": { @@ -96,7 +98,7 @@ } -@scenario(FEATURE_FILE, "Successful execution of a CWL workflow with the Airflow API") +@scenario(FEATURE_FILE, "Successful execution of a DAG workflow with the Airflow API") def test_successful_execution_of_a_cwl_workflow_with_the_airflow_api(): pass @@ -118,13 +120,16 @@ def trigger_dag(airflow_api_url, fetch_token, venue, test_case, test_dag): try: # configuration common to all DAGs - job_config = { - "conf": { - "log_level": f'{DAG_PARAMETERS[test_dag][test_case]["log_level"]}', - "request_storage": f'{DAG_PARAMETERS[test_dag][test_case]["request_storage"]}', - "request_instance_type": f'{DAG_PARAMETERS[test_dag][test_case]["request_instance_type"]}', + if test_dag == KARPENTER_DAG_ID: + job_config = {"conf": DAG_PARAMETERS[KARPENTER_DAG_ID]} + else: + job_config = { + "conf": { + "log_level": f'{DAG_PARAMETERS[test_dag][test_case]["log_level"]}', + "request_storage": f'{DAG_PARAMETERS[test_dag][test_case]["request_storage"]}', + "request_instance_type": f'{DAG_PARAMETERS[test_dag][test_case]["request_instance_type"]}', + } } - } # configuration specific to CWL_DAG if test_dag == CWL_DAG_ID: From db2ecce5aa3140f9963169b332e49f218ff715e3 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Sat, 26 Jul 2025 04:47:02 -0600 Subject: [PATCH 4/8] Ading Karpenter test to the Airflow API tests --- .../test_cwl_workflows_with_airflow_api.py | 12 ++++---- .../test_cwl_workflows_with_ogc_api.py | 28 +++++++++++++------ 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/unity-test/system/integration/step_defs/test_cwl_workflows_with_airflow_api.py b/unity-test/system/integration/step_defs/test_cwl_workflows_with_airflow_api.py index cf3376fc..8787623e 100644 --- a/unity-test/system/integration/step_defs/test_cwl_workflows_with_airflow_api.py +++ b/unity-test/system/integration/step_defs/test_cwl_workflows_with_airflow_api.py @@ -1,9 +1,7 @@ -# This test executes the specified DAG workflow -# using the CWL DAG (classic or modular) submitted through the Airflow API. -# The workflow parameters are contained in a YAML file which is venue-dependent. -# The CWL DAGs (classic and modular) must already be deployed in Airflow, -# and it is invoked via the Airflow API. -# The CWL task is executed via a KubernetesPodOperator on a worker node +# This test executes the specified DAG workflow using the Airflow API. +# The workflow parameters are contained in a YAML file which may be venue-dependent. +# The DAG must already be deployed in Airflow. +# The DAG tasks are executed via a KubernetesPodOperator on a worker node # that is dynamically provisioned by Karpenter. import json from pathlib import Path @@ -99,7 +97,7 @@ @scenario(FEATURE_FILE, "Successful execution of a DAG workflow with the Airflow API") -def test_successful_execution_of_a_cwl_workflow_with_the_airflow_api(): +def test_successful_execution_of_a_dag_workflow_with_the_airflow_api(): pass diff --git a/unity-test/system/integration/step_defs/test_cwl_workflows_with_ogc_api.py b/unity-test/system/integration/step_defs/test_cwl_workflows_with_ogc_api.py index 8fe2f426..c55cee9b 100644 --- a/unity-test/system/integration/step_defs/test_cwl_workflows_with_ogc_api.py +++ b/unity-test/system/integration/step_defs/test_cwl_workflows_with_ogc_api.py @@ -1,9 +1,9 @@ -# This test executes the specified CWL workflow -# using the CWL DAG OGC process submitted through the OGC API. -# The workflow parameters are contained in a YAML file which is venue-dependent. -# The CWL DAG OGC process must already be deployed in Airflow, +# This test executes the specified DAG workflow +# using the DAG OGC process submitted through the OGC API. +# The workflow parameters are contained in a YAML file which may be venue-dependent. +# The DAG OGC process must already be deployed in Airflow, # and it is invoked via the OGC API. -# The CWL task is executed via a KubernetesPodOperator on a worker node +# The DAG tasks are executed via a KubernetesPodOperator on a worker node # that is dynamically provisioned by Karpenter. import json from pathlib import Path @@ -21,6 +21,13 @@ # DAG parameters are venue specific CWL_DAG_ID = "cwl_dag" CWL_DAG_MODULAR_ID = "cwl_dag_modular" +KARPENTER_DAG_ID = "karpenter_test" +GENERIC_DAG_DATA = { + "KARPENTER": { + "inputs": {"placeholder": 1}, + "outputs": {"result": {"transmissionMode": "reference"}}, + }, +} CWL_DAG_DATA = { "EMIT": { "inputs": { @@ -113,8 +120,8 @@ } -@scenario(FEATURE_FILE, "Successful execution of a CWL workflow with the OGC API") -def test_successful_execution_of_a_cwl_workflow_with_the_ogc_api(): +@scenario(FEATURE_FILE, "Successful execution of a DAG workflow with the OGC API") +def test_successful_execution_of_a_dag_workflow_with_the_ogc_api(): pass @@ -124,7 +131,9 @@ def api_up_and_running(ogc_processes): @when(parsers.parse("I trigger a {test_case} OGC job for the {test_dag} OGC process"), target_fixture="job") -def trigger_process(cwl_dag_process, cwl_dag_modular_process, venue, test_case, test_dag): +def trigger_process( + cwl_dag_process, cwl_dag_modular_process, karpenter_dag_process, venue, test_case, test_dag +): # check that this test_case and test_dag are enabled for the specified venue ogc_process = None @@ -140,6 +149,9 @@ def trigger_process(cwl_dag_process, cwl_dag_modular_process, venue, test_case, payload = CWL_DAG_MODULAR_DATA[test_case] payload["inputs"]["stac_json"] = payload["inputs"]["stac_json"][venue] payload["inputs"]["process_args"] = payload["inputs"]["process_args"][venue] + elif test_dag == KARPENTER_DAG_ID: + ogc_process = karpenter_dag_process + payload = GENERIC_DAG_DATA[test_case] print(ogc_process) assert ogc_process is not None From 4828093aa3a21bbc8271457d9acc5868381e4ce7 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Sat, 26 Jul 2025 04:54:13 -0600 Subject: [PATCH 5/8] Adding Ops workflow --- .github/workflows/integration_tests.yml | 78 ++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index 377438c9..c93d7586 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -21,7 +21,13 @@ on: MCP_VENUE_TEST_OGC_PROCESSES_API_ENDPOINT: description: "Base URL for the OGC endpoint in MCP Venue Test (i.e. https://abcdef12345.execute-api.us-west-2.amazonaws.com/test/ogc/api)" type: string - # TODO: add MCP_VENUE_OPS inputs + MCP_VENUE_OPS_AIRFLOW_API_ENDPOINT: + description: "Base URL for the Airflow API endpoint in MCP Venue Ops (i.e. https://abcdef12345.execute-api.us-west-2.amazonaws.com/test/sps/api/v1)" + type: string + MCP_VENUE_OPS_OGC_PROCESSES_API_ENDPOINT: + description: "Base URL for the OGC endpoint in MCP Venue Ops (i.e. https://abcdef12345.execute-api.us-west-2.amazonaws.com/test/ogc/api)" + type: string + jobs: Dev-Venue-Airflow-API: @@ -94,6 +100,41 @@ jobs: exit 1 fi + Ops-Venue-Airflow-API: + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup + uses: ./.github/actions/setup-action + continue-on-error: false + + - name: MCP Venue Ops - Integration tests with Airflow API + id: mcp_venue_ops_integration_tests_with_airflow_api + continue-on-error: true + env: + UNITY_USER: ${{ secrets.MCP_VENUE_OPS_UNITY_USERNAME }} + UNITY_PASSWORD: ${{ secrets.MCP_VENUE_OPS_UNITY_PASSWORD }} + UNITY_CLIENT_ID: ${{ secrets.MCP_VENUE_OPS_UNITY_CLIENTID }} + run: | + pytest -vv -s --gherkin-terminal-reporter \ + unity-test/system/integration/step_defs/test_cwl_workflows_with_airflow_api.py \ + --venue="prod" \ + --airflow-endpoint=${{ github.event.inputs.MCP_VENUE_OPS_AIRFLOW_API_ENDPOINT || vars.MCP_VENUE_OPS_AIRFLOW_API_ENDPOINT }} \ + --ogc-processes-endpoint=${{ github.event.inputs.MCP_VENUE_OPS_OGC_PROCESSES_API_ENDPOINT || vars.MCP_VENUE_OPS_OGC_PROCESSES_API_ENDPOINT }} + + - name: Check Tests Results + if: always() + run: | + tests_status=${{ steps.mcp_venue_ops_integration_tests_with_airflow_api.outcome }} + echo "Tests Status: $tests_status" + if [ "$tests_status" != "success" ]; then + echo "Integration Tests with Airflow API on MCP Venue Ops failed." + exit 1 + fi + Dev-Venue-OGC-API: runs-on: ubuntu-latest @@ -163,3 +204,38 @@ jobs: echo "Integration Tests with OGC API on MCP Venue Test failed." exit 1 fi + + Ops-Venue-OGC-API: + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup + uses: ./.github/actions/setup-action + continue-on-error: false + + - name: MCP Venue Ops - Integration tests with OGC API + id: mcp_venue_ops_integration_tests_with_ogc_api + continue-on-error: true + env: + UNITY_USER: ${{ secrets.MCP_VENUE_OPS_UNITY_USERNAME }} + UNITY_PASSWORD: ${{ secrets.MCP_VENUE_OPS_UNITY_PASSWORD }} + UNITY_CLIENT_ID: ${{ secrets.MCP_VENUE_OPS_UNITY_CLIENTID }} + run: | + pytest -vv -s --gherkin-terminal-reporter \ + unity-test/system/integration/step_defs/test_cwl_workflows_with_ogc_api.py \ + --venue="prod" \ + --airflow-endpoint=${{ github.event.inputs.MCP_VENUE_OPS_AIRFLOW_API_ENDPOINT || vars.MCP_VENUE_OPS_AIRFLOW_API_ENDPOINT }} \ + --ogc-processes-endpoint=${{ github.event.inputs.MCP_VENUE_OPS_OGC_PROCESSES_API_ENDPOINT || vars.MCP_VENUE_OPS_OGC_PROCESSES_API_ENDPOINT }} + + - name: Check Tests Results + if: always() + run: | + tests_status=${{ steps.mcp_venue_ops_integration_tests_with_ogc_api.outcome }} + echo "Tests Status: $tests_status" + if [ "$tests_status" != "success" ]; then + echo "Integration Tests with OGC API on MCP Venue Ops failed." + exit 1 + fi From 8fd085b7e3b9b0fb173c6b1d64645703b974e9f0 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Sat, 26 Jul 2025 05:10:02 -0600 Subject: [PATCH 6/8] Enabling all tests --- .../features/cwl_workflows_with_airflow_api.feature | 8 ++++---- .../features/cwl_workflows_with_ogc_api.feature | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/unity-test/system/integration/features/cwl_workflows_with_airflow_api.feature b/unity-test/system/integration/features/cwl_workflows_with_airflow_api.feature index 67da27e1..00d89a9d 100644 --- a/unity-test/system/integration/features/cwl_workflows_with_airflow_api.feature +++ b/unity-test/system/integration/features/cwl_workflows_with_airflow_api.feature @@ -14,7 +14,7 @@ Feature: Execute DAG workflows using the Airflow API Examples: | test_case | test_dag | | KARPENTER | karpenter_test | -# | EMIT | cwl_dag | -# | SBG_PREPROCESS | cwl_dag | -# | EMIT | cwl_dag_modular | -# | SBG_PREPROCESS | cwl_dag_modular | + | EMIT | cwl_dag | + | SBG_PREPROCESS | cwl_dag | + | EMIT | cwl_dag_modular | + | SBG_PREPROCESS | cwl_dag_modular | diff --git a/unity-test/system/integration/features/cwl_workflows_with_ogc_api.feature b/unity-test/system/integration/features/cwl_workflows_with_ogc_api.feature index 7ed760f1..974c07f1 100644 --- a/unity-test/system/integration/features/cwl_workflows_with_ogc_api.feature +++ b/unity-test/system/integration/features/cwl_workflows_with_ogc_api.feature @@ -14,7 +14,7 @@ Feature: Execute DAG workflows using the OGC API Examples: | test_case | test_dag | | KARPENTER | karpenter_test | -# | EMIT | cwl_dag | -# | SBG_PREPROCESS | cwl_dag | -# | EMIT | cwl_dag_modular | -# | SBG_PREPROCESS | cwl_dag_modular | + | EMIT | cwl_dag | + | SBG_PREPROCESS | cwl_dag | + | EMIT | cwl_dag_modular | + | SBG_PREPROCESS | cwl_dag_modular | From 8cf4746228b34d518696ac03acb9ed1d2fd93b55 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Sun, 27 Jul 2025 02:47:34 -0600 Subject: [PATCH 7/8] Using r7i.2xlarge for SBG_PREPROCESS CWL modular DAG --- .../step_defs/test_cwl_workflows_with_airflow_api.py | 2 +- .../integration/step_defs/test_cwl_workflows_with_ogc_api.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/unity-test/system/integration/step_defs/test_cwl_workflows_with_airflow_api.py b/unity-test/system/integration/step_defs/test_cwl_workflows_with_airflow_api.py index 8787623e..e1c2b7bb 100644 --- a/unity-test/system/integration/step_defs/test_cwl_workflows_with_airflow_api.py +++ b/unity-test/system/integration/step_defs/test_cwl_workflows_with_airflow_api.py @@ -38,7 +38,7 @@ "process_workflow": "http://awslbdockstorestack-lb-1429770210.us-west-2.elb.amazonaws.com:9998/api/ga4gh/trs/v2/tools/%23workflow%2Fdockstore.org%2Fedwinsarkissian%2FSBG-unity-preprocess-mod/versions/4/PLAIN-CWL/descriptor/%2Fprocess.cwl", "process_args": {"dev": json.dumps({})}, "log_level": "INFO", - "request_instance_type": "t3.2xlarge", + "request_instance_type": "r7i.2xlarge", "request_storage": "100Gi", }, "SBG_ISOFIT": { diff --git a/unity-test/system/integration/step_defs/test_cwl_workflows_with_ogc_api.py b/unity-test/system/integration/step_defs/test_cwl_workflows_with_ogc_api.py index c55cee9b..dc4e5ee3 100644 --- a/unity-test/system/integration/step_defs/test_cwl_workflows_with_ogc_api.py +++ b/unity-test/system/integration/step_defs/test_cwl_workflows_with_ogc_api.py @@ -99,7 +99,7 @@ "process_workflow": "http://awslbdockstorestack-lb-1429770210.us-west-2.elb.amazonaws.com:9998/api/ga4gh/trs/v2/tools/%23workflow%2Fdockstore.org%2Fedwinsarkissian%2FSBG-unity-preprocess-mod/versions/4/PLAIN-CWL/descriptor/%2Fprocess.cwl", "process_args": {"dev": json.dumps({})}, "log_level": "INFO", - "request_instance_type": "t3.2xlarge", + "request_instance_type": "r7i.2xlarge", "request_storage": "100Gi", }, "outputs": {"result": {"transmissionMode": "reference"}}, From d97485514df58d3e3e108c6606245ef0f7612a71 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Thu, 31 Jul 2025 04:30:03 -0600 Subject: [PATCH 8/8] Working version of Karpenter Test --- .../features/karpenter_test_workflow.feature | 11 +++ .../step_defs/test_karpenter_test_workflow.py | 87 +++++++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 unity-test/system/integration/features/karpenter_test_workflow.feature create mode 100644 unity-test/system/integration/step_defs/test_karpenter_test_workflow.py diff --git a/unity-test/system/integration/features/karpenter_test_workflow.feature b/unity-test/system/integration/features/karpenter_test_workflow.feature new file mode 100644 index 00000000..75a8ab12 --- /dev/null +++ b/unity-test/system/integration/features/karpenter_test_workflow.feature @@ -0,0 +1,11 @@ +Feature: Airflow Karpenter Test Workflow + + As an SPS user + I want to ensure that the system has been successfully deployed to a given venue + So that I can execute workflows as DAGs while provisioning nodes as needed + + Scenario: Execute the Karpenter Test Workflow + Given the Airflow API is up and running + When I trigger a run for the Karpenter Test DAG + Then I receive a response with status code 200 + And I see an eventual successful DAG run diff --git a/unity-test/system/integration/step_defs/test_karpenter_test_workflow.py b/unity-test/system/integration/step_defs/test_karpenter_test_workflow.py new file mode 100644 index 00000000..92529fbb --- /dev/null +++ b/unity-test/system/integration/step_defs/test_karpenter_test_workflow.py @@ -0,0 +1,87 @@ +# This test executes the "Karpenter Test" workflow +# which is a DAG composed of 3 "dummy" tasks, +# each executing on a different node type. +# A successful test will confirm +# that Airflow, Keda and Karpenter are working properly, +# for the given SPS deployment and venue. +from pathlib import Path + +import backoff +import requests +from pytest_bdd import given, scenario, then, when + +FILE_PATH = Path(__file__) +FEATURES_DIR = FILE_PATH.parent.parent / "features" +FEATURE_FILE: Path = FEATURES_DIR / "karpenter_test_workflow.feature" + +# DAG parameters are venue specific +DAG_ID = "karpenter_test" +DAG_PARAMETERS = {"placeholder": 1} + + +@scenario(FEATURE_FILE, "Execute the Karpenter Test Workflow") +def test_execute_karpenter_test_workflow(): + pass + + +@given("the Airflow API is up and running") +def api_up_and_running(): + pass + + +@when("I trigger a run for the Karpenter Test DAG", target_fixture="response") +def trigger_dag(airflow_api_url, fetch_token): + + headers = {"Authorization": f"Bearer {fetch_token}", "Content-Type": "application/json"} + job_config = {"conf": DAG_PARAMETERS} + + response = requests.post( + f"{airflow_api_url}/dags/{DAG_ID}/dagRuns", + headers=headers, + json=job_config, + # nosec + verify=False, + ) + return response + + +@then("I receive a response with status code 200") +def check_status_code(response): + assert response.status_code == 200, f"Expected status code 200, but got {response.status_code}" + + +def check_failed(e): + if isinstance(e, AssertionError): + return "failed" in e.args[0] + return False + + +@then("I see an eventual successful DAG run") +@backoff.on_exception( + backoff.constant, + (AssertionError, requests.exceptions.HTTPError), + max_time=7200, + giveup=check_failed, + jitter=None, + interval=5, +) +def poll_dag_run(response, airflow_api_url, fetch_token): + + headers = {"Authorization": f"Bearer {fetch_token}"} + + if response is not None: + dag_json = response.json() + dag_run_response = requests.get( + f"""{airflow_api_url}/dags/{DAG_ID}/dagRuns/{dag_json["dag_run_id"]}""", + headers=headers, + # nosec + verify=False, + ) + assert dag_run_response.status_code == 200, ( + f"Expected status code 2" f"00, but got {response.status_code}" + ) + json = dag_run_response.json() + assert "state" in json, 'Expected "state" element in response' + assert json["state"] == "success" + else: + pass