Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions import-automation/workflow/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ substitutions:
steps:
- id: 'import-automation-workflow'
name: 'gcr.io/cloud-builders/gcloud'
args: ['workflows', 'deploy', 'import-automation-workflow', '--source', 'import-automation-workflow.yaml', '--project', '${_PROJECT_ID}', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET}']
args: ['workflows', 'deploy', 'import-automation-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'import-automation-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET}']
dir: 'import-automation/workflow'

- id: 'spanner-ingestion-workflow'
name: 'gcr.io/cloud-builders/gcloud'
args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--source', 'spanner-ingestion-workflow.yaml', '--project', '${_PROJECT_ID}', '--set-env-vars', 'LOCATION=${_LOCATION},PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID}']
args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'spanner-ingestion-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID}']
dir: 'import-automation/workflow'

- id: 'spanner-ingestion-helper'
name: 'gcr.io/cloud-builders/gcloud'
args: ['functions', 'deploy', 'spanner-ingestion-helper', '--runtime', 'python312', '--source', 'ingestion-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'ingestion_helper', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION}']
args: ['functions', 'deploy', 'spanner-ingestion-helper', '--gen2', '--project', '${_PROJECT_ID}', '--region', '${_LOCATION}', '--runtime', 'python312', '--source', 'ingestion-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'ingestion_helper', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION}']
dir: 'import-automation/workflow'

# gcloud pubsub subscriptions create import-automation-sub --topic=import-automation-trigger --message-filter='attributes.transfer_status="TRANSFER_COMPLETED"' --push-endpoint=https://us-central1-datcom-import-automation-prod.cloudfunctions.net/import-automation-helper --push-auth-service-account=965988403328-compute@developer.gserviceaccount.com --project=datcom-import-automation-prod
- id: 'import-automation-helper'
name: 'gcr.io/cloud-builders/gcloud'
args: ['functions', 'deploy', 'import-automation-helper', '--runtime', 'python312', '--source', 'import-helper', '--trigger-topic', 'import-automation-trigger' , '--entry-point', 'handle_feed_event', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},WORKFLOW_ID=spanner-ingestion-workflow']
args: ['functions', 'deploy', 'import-automation-helper', '--gen2', '--project', '${_PROJECT_ID}', '--region', '${_LOCATION}', '--runtime', 'python312', '--source', 'import-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'handle_feed_event', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION}']
dir: 'import-automation/workflow'

10 changes: 5 additions & 5 deletions import-automation/workflow/import-automation-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ main:
assign:
- projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- region: ${sys.get_env("LOCATION")}
- imageUri: "gcr.io/datcom-ci/dc-import-executor:stable"
- imageUri: ${default(map.get(args, "imageUri"), "gcr.io/datcom-ci/dc-import-executor:stable")}
- jobId: ${text.substring(args.jobName, 0, 50) + "-" + string(int(sys.now()))}
- importName: ${args.importName}
- importConfig: ${args.importConfig}
Expand Down Expand Up @@ -73,10 +73,10 @@ main:
body:
actionType: 'update_import_status'
jobId: ${jobId}
importName: ${text.split(args.importName, ":")[1]}
importName: ${args.importName}
status: 'FAILED'
execTime: ${int(sys.now() - startTime)}
version: ${"gs://" + gcsImportBucket + "/" + text.replace_all(args.importName, ":", "/")}
executionTime: ${int(sys.now() - startTime)}
latestVersion: ${"gs://" + gcsImportBucket + "/" + text.replace_all(args.importName, ":", "/")}
schedule: ${default(map.get(args, "schedule"), "")}
result: functionResponse
- failWorkflow:
Expand All @@ -91,7 +91,7 @@ main:
actionType: 'update_import_version'
importName: ${args.importName}
version: 'staging'
comment: 'import-automation'
comment: '${"import-workflow:" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}'
result: functionResponse
- returnResult:
return:
Expand Down
96 changes: 43 additions & 53 deletions import-automation/workflow/import-helper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,24 @@
import logging
import os
from datetime import datetime, timezone
from google.cloud import spanner
from google.cloud.spanner_v1 import Transaction
from google.auth.transport.requests import Request
from google.oauth2 import id_token
import google.cloud.workflows
import requests

logging.getLogger().setLevel(logging.INFO)

PROJECT_ID = os.environ.get('PROJECT_ID')
LOCATION = os.environ.get('LOCATION')
WORKFLOW_ID = os.environ.get('WORKFLOW_ID', 'spanner-ingestion-workflow')
SPANNER_PROJECT_ID = os.environ.get('SPANNER_PROJECT_ID')
SPANNER_INSTANCE_ID = os.environ.get('SPANNER_INSTANCE_ID')
SPANNER_DATABASE_ID = os.environ.get('SPANNER_DATABASE_ID')
DEFAULT_GRAPH_PATH = "/**/*.mcf*"
WORKFLOW_ID = 'spanner-ingestion-workflow'
INGESTION_HELPER_URL = f"https://{LOCATION}-{PROJECT_ID}.cloudfunctions.net/spanner-ingestion-helper"


def invoke_ingestion_workflow(import_name):
"""Invokes the spanner ingestion workflow."""
execution_client = executions_v1.ExecutionsClient()
execution_client = workflows.executions_v1.ExecutionsClient()
parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{WORKFLOW_ID}"
workflow_args = {"imports": [import_name]}
workflow_args = {"importList": [import_name]}
try:
execution_req = workflows.executions_v1.Execution(
argument=json.dumps(workflow_args))
Expand All @@ -50,52 +48,31 @@ def invoke_ingestion_workflow(import_name):
logging.error(f"Error triggering workflow: {e}")


def update_import_status(import_name, dest_dir, status):
def update_import_status(import_name, request_json):
"""Updates the status for the specified import job."""
logging.info(f"Updating import status for {import_name} to {status}")
try:
spanner_client = spanner.Client(
project=SPANNER_PROJECT_ID,
client_options={'quota_project_id': SPANNER_PROJECT_ID})
instance = spanner_client.instance(SPANNER_INSTANCE_ID)
database = instance.database(SPANNER_DATABASE_ID)
job_id = ''
exec_time = 0
data_volume = 0
version = dest_dir
next_refresh = datetime.now(timezone.utc)
graph_paths = [DEFAULT_GRAPH_PATH]

def _record(transaction: Transaction):
columns = [
"ImportName", "State", "JobId", "ExecutionTime", "DataVolume",
"NextRefreshTimestamp", "LatestVersion", "GraphDataPaths",
"StatusUpdateTimestamp"
]
row_values = [
import_name, status, job_id, exec_time, data_volume,
next_refresh, version, graph_paths, spanner.COMMIT_TIMESTAMP
]
if status == 'READY':
columns.append("DataImportTimestamp")
row_values.append(spanner.COMMIT_TIMESTAMP)
transaction.insert_or_update(table="ImportStatus",
columns=columns,
values=[row_values])
logging.info(f"Marked {import_name} as {status}.")

database.run_in_transaction(_record)
logging.info(f"Updated Spanner status for {import_name}")
auth_req = Request()
token = id_token.fetch_id_token(auth_req, INGESTION_HELPER_URL)
headers = {'Authorization': f'Bearer {token}'}
response = requests.post(INGESTION_HELPER_URL,
json=request_json,
headers=headers)
response.raise_for_status()
logging.info(f"Updated status for {import_name}")
except Exception as e:
logging.error(f'Error updating import status for {import_name}: {e}')


# Triggered from a message on a Cloud Pub/Sub topic.
@functions_framework.cloud_event
def handle_feed_event(cloud_event):
@functions_framework.http
def handle_feed_event(request):
# Updates status in spanner and triggers ingestion workflow
# for an import using CDA feed
pubsub_message = cloud_event.data['message']
request_json = request.get_json(silent=True)
if not request_json or 'message' not in request_json:
return 'Invalid Pub/Sub message format', 400

pubsub_message = request_json['message']
logging.info(f"Received Pub/Sub message: {pubsub_message}")
try:
data_bytes = base64.b64decode(pubsub_message["data"])
Expand All @@ -106,12 +83,25 @@ def handle_feed_event(cloud_event):

attributes = pubsub_message.get('attributes', {})
if attributes.get('transfer_status') == 'TRANSFER_COMPLETED':
feed_type = attributes.get('feed_type')
import_name = attributes.get('import_name')
dest_dir = attributes.get('dest_dir')
if feed_type == 'cns_to_gcs':
logging.info(f'Updating {import_name} import status')
update_import_status(import_name, dest_dir, 'READY')
import_status = attributes.get('import_status', 'STAGING')
import_version = attributes.get(
'import_version',
datetime.now(timezone.utc).strftime("%Y-%m-%d"))
graph_path = attributes.get('graph_paths', "/**/*mcf*")
request = {
'actionType': 'update_import_status',
'importName': import_name,
'status': import_status,
'latestVersion': import_version,
'nextRefresh': datetime.now(timezone.utc).isoformat(),
'graphPath': graph_path
}

logging.info(
f"Updating import status for {import_name} to {import_status}")
update_import_status(import_name, request)
if import_status == 'READY':
invoke_ingestion_workflow(import_name)
else:
logging.info(f'Unknown feed type: {feed_type}')

return 'OK', 200
5 changes: 3 additions & 2 deletions import-automation/workflow/import-helper/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
functions-framework==3.*
google-cloud-spanner
google-cloud-workflows
google-cloud-workflows
google-auth
requests
7 changes: 5 additions & 2 deletions import-automation/workflow/ingestion-helper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ Updates the status of a specific import job.
* `importName` (Required): The name of the import.
* `status` (Required): The new status to set.
* `jobId` (Optional): The Dataflow job ID.
* `execTime` (Optional): Execution time in seconds.
* `executionTime` (Optional): Execution time in seconds.
* `dataVolume` (Optional): Data volume in bytes.
* `version` (Optional): The version string.
* `latestVersion` (Optional): Latest version string.
* `graphPath` (Optional): Graph path regex.
* `schedule` (Optional): A cron schedule string.
* `nextRefresh` (Optional): Next refresh timestamp.


#### `update_import_version`
Updates the version of an import, records an audit log, and marks the import as `READY`.
Expand Down
80 changes: 30 additions & 50 deletions import-automation/workflow/ingestion-helper/import_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import logging
import croniter
import re
from datetime import datetime, timezone
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
Expand All @@ -41,7 +42,7 @@ def get_caller_identity(request):
logging.warning(
f"Could not decode unverified token for debugging: {e}")
email = unverified_claims.get('email', 'unknown_email')
return f"{email} (unverified)"
return f"{email}"
return 'decode_error'
else:
logging.warning(
Expand All @@ -51,7 +52,7 @@ def get_caller_identity(request):
return 'no_auth_header'


def get_import_params(request_json) -> dict:
def get_import_params(request) -> dict:
"""Extracts and calculates import parameters from the request JSON.

Args:
Expand All @@ -60,68 +61,47 @@ def get_import_params(request_json) -> dict:
Returns:
A dictionary with import params.
"""
import_name = request_json.get('importName', '')
status = request_json.get('status', '')
job_id = request_json.get('jobId', '')
exec_time = request_json.get('execTime', 0)
data_volume = request_json.get('dataVolume', 0)
version = request_json.get('version', '')
graph_paths = request_json.get('graph_paths', [])
schedule = request_json.get('schedule', '')
next_refresh = datetime.now(timezone.utc)
try:
next_refresh = croniter.croniter(schedule, datetime.now(
timezone.utc)).get_next(datetime)
except (croniter.CroniterError) as e:
logging.error(
f"Error calculating next refresh from schedule '{schedule}': {e}")
return {
'import_name': import_name,
'status': status,
'job_id': job_id,
'exec_time': exec_time,
'data_volume': data_volume,
'version': version,
'graph_paths': graph_paths,
'next_refresh': next_refresh
# Convert CamelCase or mixedCase to snake_case.
request_json = {
re.sub(r'(?<!^)(?=[A-Z])', '_', k).lower(): v
for k, v in request.items()
}


def create_import_params(summary) -> dict:
"""Creates import parameters from the import summary.

Args:
summary: A dictionary containing import summary details.

Returns:
A dictionary with import params.
"""
import_name = summary.get('import_name', '')
status = summary.get('status', '').removeprefix('ImportStatus.')
job_id = summary.get('job_id', '')
exec_time = summary.get('execution_time', 0)
data_volume = summary.get('data_volume', 0)
version = summary.get('latest_version', '')
graph_paths = summary.get('graph_paths', [])
next_refresh_str = summary.get('next_refresh', '')
next_refresh = None
import_name = request_json.get('import_name', '')
status = request_json.get('status', '').removeprefix('ImportStatus.')
job_id = request_json.get('job_id', '')
execution_time = request_json.get('execution_time', 0)
data_volume = request_json.get('data_volume', 0)
latest_version = request_json.get('latest_version', '')
graph_path = request_json.get('graph_path', '')
schedule = request_json.get('schedule', '')
next_refresh_str = request_json.get('next_refresh', '')
next_refresh = datetime.now(timezone.utc)
if next_refresh_str:
try:
next_refresh = datetime.fromisoformat(next_refresh_str)
except ValueError:
logging.error(f"Error parsing next_refresh: {next_refresh_str}")

if schedule:
try:
next_refresh = croniter.croniter(schedule, datetime.now(
timezone.utc)).get_next(datetime)
except (croniter.CroniterError) as e:
logging.error(
f"Error calculating next refresh from schedule '{schedule}': {e}"
)
return {
'import_name': import_name,
'status': status,
'job_id': job_id,
'exec_time': exec_time,
'execution_time': execution_time,
'data_volume': data_volume,
'version': version,
'graph_paths': graph_paths,
'next_refresh': next_refresh,
'latest_version': latest_version,
'graph_path': graph_path,
'next_refresh': next_refresh
}


def get_ingestion_metrics(project_id, location, job_id):
"""Fetches graph metrics (nodes, edges, observations) and execution time from a Dataflow job.

Expand Down
24 changes: 15 additions & 9 deletions import-automation/workflow/ingestion-helper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,23 @@ def ingestion_helper(request):
# importName: name of the import
# status: new status
# jobId: Dataflow job ID (optional)
# execTime: execution time in seconds (optional)
# executionTime: execution time in seconds (optional)
# dataVolume: data volume in bytes (optional)
# version: version string (optional)
# latestVersion: latest version string (optional)
# graphPath: graph path regex (optional)
# schedule: cron schedule string (optional)
# nextRefresh: next refresh timestamp (optional)
validation_error = _validate_params(request_json,
['importName', 'status'])
if validation_error:
return (validation_error, 400)
import_name = request_json['importName']
import_name = request_json['importName'].split(':')[-1]
status = request_json['status']
logging.info(f'Updating import {import_name} to status {status}')
params = import_utils.get_import_params(request_json)
if status == 'STAGING':
latest_version = request_json['latestVersion']
storage.update_staging_version(import_name, latest_version)
spanner.update_import_status(params)
return (f"Updated import {import_name} to status {params['status']}",
200)
Expand All @@ -154,19 +159,20 @@ def ingestion_helper(request):
import_name = request_json['importName']
version = request_json['version']
comment = request_json['comment']
short_import_name = import_name.split(':')[-1]
caller = import_utils.get_caller_identity(request)
if not comment.startswith('import-workflow'):
comment = f'manual-update:{caller} {comment}'
logging.info(
f"Import {short_import_name} version {version} caller: {caller} comment: {comment}"
)
f"Import {import_name} version {version} comment: {comment}")
if version == 'staging':
version = storage.get_staging_version(import_name)
summary = storage.get_import_summary(import_name, version)
params = import_utils.create_import_params(summary)
params = import_utils.get_import_params(summary)
params['import_name'] = import_name.split(':')[-1]
params['status'] = 'READY'
storage.update_version_file(import_name, version)
spanner.update_version_history(import_name, version, caller, comment)
spanner.update_version_history(import_name, version, comment)
spanner.update_import_status(params)
return (f'Updated import {short_import_name} to version {version}', 200)
return (f'Updated import {import_name} to version {version}', 200)
else:
return (f'Unknown actionType: {actionType}', 400)
Loading
Loading