Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions import-automation/workflow/aggregation-helper/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import functions_framework
from google.cloud import bigquery
import logging
from flask import jsonify
import os

# Initialize BigQuery Client
try:
bq_client = bigquery.Client()
except Exception as e:
logging.warning(f"Failed to initialize BigQuery client: {e}")
Comment on lines +24 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Catching a broad Exception can hide bugs and make debugging difficult. It's better to catch more specific exceptions. For BigQuery client initialization, consider catching google.auth.exceptions.DefaultCredentialsError or other specific exceptions from the google-cloud-bigquery library if you know what might go wrong.

bq_client = None

BQ_DATASET_ID = os.environ.get('BQ_DATASET_ID')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The environment variable BQ_DATASET_ID is loaded but it's never used in the code. This can be confusing for future developers. Please either use it in your queries (e.g., to specify the dataset for tables) or remove it if it's not needed.

SPANNER_PROJECT_ID = os.environ.get('SPANNER_PROJECT_ID')
SPANNER_INSTANCE_ID = os.environ.get('SPANNER_INSTANCE_ID')
SPANNER_DATABASE_ID = os.environ.get('SPANNER_DATABASE_ID')
GCS_BUCKET_ID = os.environ.get('GCS_BUCKET_ID')


@functions_framework.http
def aggregation_helper(request):
"""
HTTP Cloud Function that takes importName and runs a BQ query.
"""
if not bq_client:
return ('BigQuery client not initialized', 500)

request_json = request.get_json(silent=True)
if not request_json:
return ('Request is not a valid JSON', 400)

import_list = request_json.get('importList')
if not import_list:
return ("'importList' parameter is missing", 400)

logging.info(f"Received request for importList: {import_list}")

results = []

try:
for import_item in import_list:
import_name = import_item.get('importName')
if not import_name:
logging.warning(
f"Skipping item without importName: {import_item}")
continue

query = None
# Define specific queries based on importName
if "india_census" in import_name:
# Placeholder for India Census specific logic
query = """
SELECT @import_name as import_name, CURRENT_TIMESTAMP() as execution_time
"""
elif "us_census" in import_name:
# Placeholder for US Census specific logic
query = """
SELECT @import_name as import_name, CURRENT_TIMESTAMP() as execution_time
"""
else:
logging.info(
f"No specific aggregation logic for import: {import_name}")
continue

if query:
job_config = bigquery.QueryJobConfig(query_parameters=[
bigquery.ScalarQueryParameter("import_name", "STRING",
import_name),
])
query_job = bq_client.query(query, job_config=job_config)
query_results = query_job.result()
for row in query_results:
results.append(dict(row))

return jsonify({"status": "success"}), 200

except Exception as e:
logging.error(f"Aggregation failed: {e}")
return (f"Aggregation failed: {str(e)}", 500)
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
functions-framework==3.*
google-cloud-bigquery
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It's a good practice to pin dependency versions in requirements.txt to ensure reproducible builds and prevent unexpected breakages from library updates. Please consider specifying a version for google-cloud-bigquery.

google-cloud-bigquery==3.17.2

6 changes: 6 additions & 0 deletions import-automation/workflow/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ substitutions:
_GCS_BUCKET_ID: 'datcom-prod-imports'
_LOCATION: 'us-central1'
_GCS_MOUNT_BUCKET: 'datcom-volume-mount'
_BQ_DATASET_ID: 'datacommons'

steps:
- id: 'import-automation-workflow'
Expand All @@ -39,6 +40,11 @@ steps:
args: ['functions', 'deploy', 'spanner-ingestion-helper', '--runtime', 'python312', '--source', 'ingestion-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'ingestion_helper', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION}']
dir: 'import-automation/workflow'

- id: 'import-aggregation-helper'
name: 'gcr.io/cloud-builders/gcloud'
args: ['functions', 'deploy', 'import-aggregation-helper', '--runtime', 'python312', '--source', 'aggregation-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'aggregation_helper', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION},BQ_DATASET_ID=${_BQ_DATASET_ID}']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The import-aggregation-helper function is being configured with several environment variables that it does not appear to use, such as SPANNER_PROJECT_ID, SPANNER_INSTANCE_ID, SPANNER_DATABASE_ID, GCS_BUCKET_ID, and LOCATION. To improve security and reduce configuration complexity, it's best practice to only provide the environment variables that are actually required by the function. The aggregation-helper function only seems to require BQ_DATASET_ID.

  args: ['functions', 'deploy', 'import-aggregation-helper', '--runtime', 'python312', '--source', 'aggregation-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'aggregation_helper', '--project', '${_PROJECT_ID}', '--set-env-vars', 'BQ_DATASET_ID=${_BQ_DATASET_ID}']

dir: 'import-automation/workflow'

- id: 'import-automation-helper'
name: 'gcr.io/cloud-builders/gcloud'
args: ['functions', 'deploy', 'import-automation-helper', '--runtime', 'python312', '--source', 'import-helper', '--trigger-topic', 'import-automation-trigger' , '--entry-point', 'handle_feed_event', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},WORKFLOW_ID=spanner-ingestion-workflow']
Expand Down
20 changes: 14 additions & 6 deletions import-automation/workflow/spanner-ingestion-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ main:
- dataflow_gcs_path: 'gs://datcom-templates/templates/flex/ingestion.json'
- location: '${sys.get_env("LOCATION")}'
- spanner_database_id: '${sys.get_env("SPANNER_DATABASE_ID")}'
- ingestion_helper: "spanner-ingestion-helper"
- function_url: ${"https://" + location + "-" + project_id + ".cloudfunctions.net/" + ingestion_helper}
- ingestion_function: ${"https://" + location + "-" + project_id + ".cloudfunctions.net/" + "spanner-ingestion-helper"}
- aggregation_function: ${"https://" + location + "-" + project_id + ".cloudfunctions.net/" + "import-aggregation-helper"}
- import_list: ${default(map.get(args, "imports"), [])}
- execution_error: null
- acquire_ingestion_lock:
call: http.post
args:
url: ${function_url}
url: ${ingestion_function}
auth:
type: OIDC
body:
Expand All @@ -31,7 +31,7 @@ main:
- get_import_list:
call: http.post
args:
url: ${function_url}
url: ${ingestion_function}
auth:
type: OIDC
body:
Expand All @@ -49,10 +49,18 @@ main:
spanner_database_id: ${spanner_database_id}
wait_period: ${wait_period}
result: dataflow_job_id
- run_aggregation:
call: http.post
args:
url: ${aggregation_function}
auth:
type: OIDC
body:
importList: ${import_info.body}
- update_ingestion_status:
call: http.post
args:
url: ${function_url}
url: ${ingestion_function}
auth:
type: OIDC
body:
Expand All @@ -70,7 +78,7 @@ main:
- release_ingestion_lock:
call: http.post
args:
url: ${function_url}
url: ${ingestion_function}
auth:
type: OIDC
body:
Expand Down
Loading