diff --git a/import-automation/workflow/aggregation-helper/main.py b/import-automation/workflow/aggregation-helper/main.py new file mode 100644 index 0000000000..728ec4faaa --- /dev/null +++ b/import-automation/workflow/aggregation-helper/main.py @@ -0,0 +1,94 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functions_framework +from google.cloud import bigquery +import logging +from flask import jsonify +import os + +# Initialize BigQuery Client +try: + bq_client = bigquery.Client() +except Exception as e: + logging.warning(f"Failed to initialize BigQuery client: {e}") + bq_client = None + +BQ_DATASET_ID = os.environ.get('BQ_DATASET_ID') +SPANNER_PROJECT_ID = os.environ.get('SPANNER_PROJECT_ID') +SPANNER_INSTANCE_ID = os.environ.get('SPANNER_INSTANCE_ID') +SPANNER_DATABASE_ID = os.environ.get('SPANNER_DATABASE_ID') +GCS_BUCKET_ID = os.environ.get('GCS_BUCKET_ID') + + +@functions_framework.http +def aggregation_helper(request): + """ + HTTP Cloud Function that takes importName and runs a BQ query. + """ + if not bq_client: + return ('BigQuery client not initialized', 500) + + request_json = request.get_json(silent=True) + if not request_json: + return ('Request is not a valid JSON', 400) + + import_list = request_json.get('importList') + if not import_list: + return ("'importList' parameter is missing", 400) + + logging.info(f"Received request for importList: {import_list}") + + results = [] + + try: + for import_item in import_list: + import_name = import_item.get('importName') + if not import_name: + logging.warning( + f"Skipping item without importName: {import_item}") + continue + + query = None + # Define specific queries based on importName + if "india_census" in import_name: + # Placeholder for India Census specific logic + query = """ + SELECT @import_name as import_name, CURRENT_TIMESTAMP() as execution_time + """ + elif "us_census" in import_name: + # Placeholder for US Census specific logic + query = """ + SELECT @import_name as import_name, CURRENT_TIMESTAMP() as execution_time + """ + else: + logging.info( + f"No specific aggregation logic for import: {import_name}") + continue + + if query: + job_config = bigquery.QueryJobConfig(query_parameters=[ + bigquery.ScalarQueryParameter("import_name", "STRING", + import_name), + ]) + query_job = bq_client.query(query, job_config=job_config) + query_results = query_job.result() + for row in query_results: + results.append(dict(row)) + + return jsonify({"status": "success"}), 200 + + except Exception as e: + logging.error(f"Aggregation failed: {e}") + return (f"Aggregation failed: {str(e)}", 500) diff --git a/import-automation/workflow/aggregation-helper/requirements.txt b/import-automation/workflow/aggregation-helper/requirements.txt new file mode 100644 index 0000000000..bfd6f7c4e4 --- /dev/null +++ b/import-automation/workflow/aggregation-helper/requirements.txt @@ -0,0 +1,2 @@ +functions-framework==3.* +google-cloud-bigquery diff --git a/import-automation/workflow/cloudbuild.yaml b/import-automation/workflow/cloudbuild.yaml index d76c9c2c76..a205fe6b40 100644 --- a/import-automation/workflow/cloudbuild.yaml +++ b/import-automation/workflow/cloudbuild.yaml @@ -22,6 +22,7 @@ substitutions: _GCS_BUCKET_ID: 'datcom-prod-imports' _LOCATION: 'us-central1' _GCS_MOUNT_BUCKET: 'datcom-volume-mount' + _BQ_DATASET_ID: 'datacommons' steps: - id: 'import-automation-workflow' @@ -39,6 +40,11 @@ steps: args: ['functions', 'deploy', 'spanner-ingestion-helper', '--runtime', 'python312', '--source', 'ingestion-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'ingestion_helper', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION}'] dir: 'import-automation/workflow' +- id: 'import-aggregation-helper' + name: 'gcr.io/cloud-builders/gcloud' + args: ['functions', 'deploy', 'import-aggregation-helper', '--runtime', 'python312', '--source', 'aggregation-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'aggregation_helper', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION},BQ_DATASET_ID=${_BQ_DATASET_ID}'] + dir: 'import-automation/workflow' + - id: 'import-automation-helper' name: 'gcr.io/cloud-builders/gcloud' args: ['functions', 'deploy', 'import-automation-helper', '--runtime', 'python312', '--source', 'import-helper', '--trigger-topic', 'import-automation-trigger' , '--entry-point', 'handle_feed_event', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},WORKFLOW_ID=spanner-ingestion-workflow'] diff --git a/import-automation/workflow/spanner-ingestion-workflow.yaml b/import-automation/workflow/spanner-ingestion-workflow.yaml index 41b3df79b7..db4247728a 100644 --- a/import-automation/workflow/spanner-ingestion-workflow.yaml +++ b/import-automation/workflow/spanner-ingestion-workflow.yaml @@ -10,14 +10,14 @@ main: - dataflow_gcs_path: 'gs://datcom-templates/templates/flex/ingestion.json' - location: '${sys.get_env("LOCATION")}' - spanner_database_id: '${sys.get_env("SPANNER_DATABASE_ID")}' - - ingestion_helper: "spanner-ingestion-helper" - - function_url: ${"https://" + location + "-" + project_id + ".cloudfunctions.net/" + ingestion_helper} + - ingestion_function: ${"https://" + location + "-" + project_id + ".cloudfunctions.net/" + "spanner-ingestion-helper"} + - aggregation_function: ${"https://" + location + "-" + project_id + ".cloudfunctions.net/" + "import-aggregation-helper"} - import_list: ${default(map.get(args, "imports"), [])} - execution_error: null - acquire_ingestion_lock: call: http.post args: - url: ${function_url} + url: ${ingestion_function} auth: type: OIDC body: @@ -31,7 +31,7 @@ main: - get_import_list: call: http.post args: - url: ${function_url} + url: ${ingestion_function} auth: type: OIDC body: @@ -49,10 +49,18 @@ main: spanner_database_id: ${spanner_database_id} wait_period: ${wait_period} result: dataflow_job_id + - run_aggregation: + call: http.post + args: + url: ${aggregation_function} + auth: + type: OIDC + body: + importList: ${import_info.body} - update_ingestion_status: call: http.post args: - url: ${function_url} + url: ${ingestion_function} auth: type: OIDC body: @@ -70,7 +78,7 @@ main: - release_ingestion_lock: call: http.post args: - url: ${function_url} + url: ${ingestion_function} auth: type: OIDC body: