From 2d7fb8c050349ca7737eac6e2e7956d1a95130d4 Mon Sep 17 00:00:00 2001 From: Vishal Gupta Date: Mon, 12 Jan 2026 08:43:31 +0000 Subject: [PATCH] Import automation fixes - Update version history for spanner ingestion - Use update_import_status cloud function for feed based imports - Update staging version file for feed based imports - Update import param names for consistent processing - Use http type handler for feed based imports - Use a single graph path instead of list --- import-automation/workflow/cloudbuild.yaml | 9 +- .../workflow/import-automation-workflow.yaml | 10 +- .../workflow/import-helper/main.py | 96 +++++++++---------- .../workflow/import-helper/requirements.txt | 5 +- .../workflow/ingestion-helper/README.md | 7 +- .../workflow/ingestion-helper/import_utils.py | 80 ++++++---------- .../workflow/ingestion-helper/main.py | 24 +++-- .../ingestion-helper/spanner_client.py | 48 +++++++--- .../ingestion-helper/storage_client.py | 38 ++++++-- .../workflow/spanner-ingestion-workflow.yaml | 12 ++- 10 files changed, 179 insertions(+), 150 deletions(-) diff --git a/import-automation/workflow/cloudbuild.yaml b/import-automation/workflow/cloudbuild.yaml index d76c9c2c76..d120638bf1 100644 --- a/import-automation/workflow/cloudbuild.yaml +++ b/import-automation/workflow/cloudbuild.yaml @@ -26,21 +26,22 @@ substitutions: steps: - id: 'import-automation-workflow' name: 'gcr.io/cloud-builders/gcloud' - args: ['workflows', 'deploy', 'import-automation-workflow', '--source', 'import-automation-workflow.yaml', '--project', '${_PROJECT_ID}', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET}'] + args: ['workflows', 'deploy', 'import-automation-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'import-automation-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET}'] dir: 'import-automation/workflow' - id: 'spanner-ingestion-workflow' name: 'gcr.io/cloud-builders/gcloud' - args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--source', 'spanner-ingestion-workflow.yaml', '--project', '${_PROJECT_ID}', '--set-env-vars', 'LOCATION=${_LOCATION},PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID}'] + args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'spanner-ingestion-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID}'] dir: 'import-automation/workflow' - id: 'spanner-ingestion-helper' name: 'gcr.io/cloud-builders/gcloud' - args: ['functions', 'deploy', 'spanner-ingestion-helper', '--runtime', 'python312', '--source', 'ingestion-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'ingestion_helper', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION}'] + args: ['functions', 'deploy', 'spanner-ingestion-helper', '--gen2', '--project', '${_PROJECT_ID}', '--region', '${_LOCATION}', '--runtime', 'python312', '--source', 'ingestion-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'ingestion_helper', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},LOCATION=${_LOCATION}'] dir: 'import-automation/workflow' +# gcloud pubsub subscriptions create import-automation-sub --topic=import-automation-trigger --message-filter='attributes.transfer_status="TRANSFER_COMPLETED"' --push-endpoint=https://us-central1-datcom-import-automation-prod.cloudfunctions.net/import-automation-helper --push-auth-service-account=965988403328-compute@developer.gserviceaccount.com --project=datcom-import-automation-prod - id: 'import-automation-helper' name: 'gcr.io/cloud-builders/gcloud' - args: ['functions', 'deploy', 'import-automation-helper', '--runtime', 'python312', '--source', 'import-helper', '--trigger-topic', 'import-automation-trigger' , '--entry-point', 'handle_feed_event', '--project', '${_PROJECT_ID}', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},WORKFLOW_ID=spanner-ingestion-workflow'] + args: ['functions', 'deploy', 'import-automation-helper', '--gen2', '--project', '${_PROJECT_ID}', '--region', '${_LOCATION}', '--runtime', 'python312', '--source', 'import-helper', '--no-allow-unauthenticated', '--trigger-http', '--entry-point', 'handle_feed_event', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION}'] dir: 'import-automation/workflow' diff --git a/import-automation/workflow/import-automation-workflow.yaml b/import-automation/workflow/import-automation-workflow.yaml index 10a128bf4f..f0386f416d 100644 --- a/import-automation/workflow/import-automation-workflow.yaml +++ b/import-automation/workflow/import-automation-workflow.yaml @@ -5,7 +5,7 @@ main: assign: - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} - region: ${sys.get_env("LOCATION")} - - imageUri: "gcr.io/datcom-ci/dc-import-executor:stable" + - imageUri: ${default(map.get(args, "imageUri"), "gcr.io/datcom-ci/dc-import-executor:stable")} - jobId: ${text.substring(args.jobName, 0, 50) + "-" + string(int(sys.now()))} - importName: ${args.importName} - importConfig: ${args.importConfig} @@ -73,10 +73,10 @@ main: body: actionType: 'update_import_status' jobId: ${jobId} - importName: ${text.split(args.importName, ":")[1]} + importName: ${args.importName} status: 'FAILED' - execTime: ${int(sys.now() - startTime)} - version: ${"gs://" + gcsImportBucket + "/" + text.replace_all(args.importName, ":", "/")} + executionTime: ${int(sys.now() - startTime)} + latestVersion: ${"gs://" + gcsImportBucket + "/" + text.replace_all(args.importName, ":", "/")} schedule: ${default(map.get(args, "schedule"), "")} result: functionResponse - failWorkflow: @@ -91,7 +91,7 @@ main: actionType: 'update_import_version' importName: ${args.importName} version: 'staging' - comment: 'import-automation' + comment: '${"import-workflow:" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}' result: functionResponse - returnResult: return: diff --git a/import-automation/workflow/import-helper/main.py b/import-automation/workflow/import-helper/main.py index 6f954091bf..e7159e5b6c 100644 --- a/import-automation/workflow/import-helper/main.py +++ b/import-automation/workflow/import-helper/main.py @@ -18,26 +18,24 @@ import logging import os from datetime import datetime, timezone -from google.cloud import spanner -from google.cloud.spanner_v1 import Transaction +from google.auth.transport.requests import Request +from google.oauth2 import id_token import google.cloud.workflows +import requests logging.getLogger().setLevel(logging.INFO) PROJECT_ID = os.environ.get('PROJECT_ID') LOCATION = os.environ.get('LOCATION') -WORKFLOW_ID = os.environ.get('WORKFLOW_ID', 'spanner-ingestion-workflow') -SPANNER_PROJECT_ID = os.environ.get('SPANNER_PROJECT_ID') -SPANNER_INSTANCE_ID = os.environ.get('SPANNER_INSTANCE_ID') -SPANNER_DATABASE_ID = os.environ.get('SPANNER_DATABASE_ID') -DEFAULT_GRAPH_PATH = "/**/*.mcf*" +WORKFLOW_ID = 'spanner-ingestion-workflow' +INGESTION_HELPER_URL = f"https://{LOCATION}-{PROJECT_ID}.cloudfunctions.net/spanner-ingestion-helper" def invoke_ingestion_workflow(import_name): """Invokes the spanner ingestion workflow.""" - execution_client = executions_v1.ExecutionsClient() + execution_client = workflows.executions_v1.ExecutionsClient() parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{WORKFLOW_ID}" - workflow_args = {"imports": [import_name]} + workflow_args = {"importList": [import_name]} try: execution_req = workflows.executions_v1.Execution( argument=json.dumps(workflow_args)) @@ -50,52 +48,31 @@ def invoke_ingestion_workflow(import_name): logging.error(f"Error triggering workflow: {e}") -def update_import_status(import_name, dest_dir, status): +def update_import_status(import_name, request_json): """Updates the status for the specified import job.""" - logging.info(f"Updating import status for {import_name} to {status}") try: - spanner_client = spanner.Client( - project=SPANNER_PROJECT_ID, - client_options={'quota_project_id': SPANNER_PROJECT_ID}) - instance = spanner_client.instance(SPANNER_INSTANCE_ID) - database = instance.database(SPANNER_DATABASE_ID) - job_id = '' - exec_time = 0 - data_volume = 0 - version = dest_dir - next_refresh = datetime.now(timezone.utc) - graph_paths = [DEFAULT_GRAPH_PATH] - - def _record(transaction: Transaction): - columns = [ - "ImportName", "State", "JobId", "ExecutionTime", "DataVolume", - "NextRefreshTimestamp", "LatestVersion", "GraphDataPaths", - "StatusUpdateTimestamp" - ] - row_values = [ - import_name, status, job_id, exec_time, data_volume, - next_refresh, version, graph_paths, spanner.COMMIT_TIMESTAMP - ] - if status == 'READY': - columns.append("DataImportTimestamp") - row_values.append(spanner.COMMIT_TIMESTAMP) - transaction.insert_or_update(table="ImportStatus", - columns=columns, - values=[row_values]) - logging.info(f"Marked {import_name} as {status}.") - - database.run_in_transaction(_record) - logging.info(f"Updated Spanner status for {import_name}") + auth_req = Request() + token = id_token.fetch_id_token(auth_req, INGESTION_HELPER_URL) + headers = {'Authorization': f'Bearer {token}'} + response = requests.post(INGESTION_HELPER_URL, + json=request_json, + headers=headers) + response.raise_for_status() + logging.info(f"Updated status for {import_name}") except Exception as e: logging.error(f'Error updating import status for {import_name}: {e}') # Triggered from a message on a Cloud Pub/Sub topic. -@functions_framework.cloud_event -def handle_feed_event(cloud_event): +@functions_framework.http +def handle_feed_event(request): # Updates status in spanner and triggers ingestion workflow # for an import using CDA feed - pubsub_message = cloud_event.data['message'] + request_json = request.get_json(silent=True) + if not request_json or 'message' not in request_json: + return 'Invalid Pub/Sub message format', 400 + + pubsub_message = request_json['message'] logging.info(f"Received Pub/Sub message: {pubsub_message}") try: data_bytes = base64.b64decode(pubsub_message["data"]) @@ -106,12 +83,25 @@ def handle_feed_event(cloud_event): attributes = pubsub_message.get('attributes', {}) if attributes.get('transfer_status') == 'TRANSFER_COMPLETED': - feed_type = attributes.get('feed_type') import_name = attributes.get('import_name') - dest_dir = attributes.get('dest_dir') - if feed_type == 'cns_to_gcs': - logging.info(f'Updating {import_name} import status') - update_import_status(import_name, dest_dir, 'READY') + import_status = attributes.get('import_status', 'STAGING') + import_version = attributes.get( + 'import_version', + datetime.now(timezone.utc).strftime("%Y-%m-%d")) + graph_path = attributes.get('graph_paths', "/**/*mcf*") + request = { + 'actionType': 'update_import_status', + 'importName': import_name, + 'status': import_status, + 'latestVersion': import_version, + 'nextRefresh': datetime.now(timezone.utc).isoformat(), + 'graphPath': graph_path + } + + logging.info( + f"Updating import status for {import_name} to {import_status}") + update_import_status(import_name, request) + if import_status == 'READY': invoke_ingestion_workflow(import_name) - else: - logging.info(f'Unknown feed type: {feed_type}') + + return 'OK', 200 diff --git a/import-automation/workflow/import-helper/requirements.txt b/import-automation/workflow/import-helper/requirements.txt index 6ef5bb17a0..d70e560976 100644 --- a/import-automation/workflow/import-helper/requirements.txt +++ b/import-automation/workflow/import-helper/requirements.txt @@ -1,3 +1,4 @@ functions-framework==3.* -google-cloud-spanner -google-cloud-workflows \ No newline at end of file +google-cloud-workflows +google-auth +requests diff --git a/import-automation/workflow/ingestion-helper/README.md b/import-automation/workflow/ingestion-helper/README.md index d6d5815342..0a0fa84091 100644 --- a/import-automation/workflow/ingestion-helper/README.md +++ b/import-automation/workflow/ingestion-helper/README.md @@ -41,10 +41,13 @@ Updates the status of a specific import job. * `importName` (Required): The name of the import. * `status` (Required): The new status to set. * `jobId` (Optional): The Dataflow job ID. -* `execTime` (Optional): Execution time in seconds. +* `executionTime` (Optional): Execution time in seconds. * `dataVolume` (Optional): Data volume in bytes. -* `version` (Optional): The version string. +* `latestVersion` (Optional): Latest version string. +* `graphPath` (Optional): Graph path regex. * `schedule` (Optional): A cron schedule string. +* `nextRefresh` (Optional): Next refresh timestamp. + #### `update_import_version` Updates the version of an import, records an audit log, and marks the import as `READY`. diff --git a/import-automation/workflow/ingestion-helper/import_utils.py b/import-automation/workflow/ingestion-helper/import_utils.py index 340b604545..5cb64c7ed0 100644 --- a/import-automation/workflow/ingestion-helper/import_utils.py +++ b/import-automation/workflow/ingestion-helper/import_utils.py @@ -15,6 +15,7 @@ import logging import croniter +import re from datetime import datetime, timezone from googleapiclient.discovery import build from googleapiclient.errors import HttpError @@ -41,7 +42,7 @@ def get_caller_identity(request): logging.warning( f"Could not decode unverified token for debugging: {e}") email = unverified_claims.get('email', 'unknown_email') - return f"{email} (unverified)" + return f"{email}" return 'decode_error' else: logging.warning( @@ -51,7 +52,7 @@ def get_caller_identity(request): return 'no_auth_header' -def get_import_params(request_json) -> dict: +def get_import_params(request) -> dict: """Extracts and calculates import parameters from the request JSON. Args: @@ -60,68 +61,47 @@ def get_import_params(request_json) -> dict: Returns: A dictionary with import params. """ - import_name = request_json.get('importName', '') - status = request_json.get('status', '') - job_id = request_json.get('jobId', '') - exec_time = request_json.get('execTime', 0) - data_volume = request_json.get('dataVolume', 0) - version = request_json.get('version', '') - graph_paths = request_json.get('graph_paths', []) - schedule = request_json.get('schedule', '') - next_refresh = datetime.now(timezone.utc) - try: - next_refresh = croniter.croniter(schedule, datetime.now( - timezone.utc)).get_next(datetime) - except (croniter.CroniterError) as e: - logging.error( - f"Error calculating next refresh from schedule '{schedule}': {e}") - return { - 'import_name': import_name, - 'status': status, - 'job_id': job_id, - 'exec_time': exec_time, - 'data_volume': data_volume, - 'version': version, - 'graph_paths': graph_paths, - 'next_refresh': next_refresh + # Convert CamelCase or mixedCase to snake_case. + request_json = { + re.sub(r'(? dict: - """Creates import parameters from the import summary. - - Args: - summary: A dictionary containing import summary details. - - Returns: - A dictionary with import params. - """ - import_name = summary.get('import_name', '') - status = summary.get('status', '').removeprefix('ImportStatus.') - job_id = summary.get('job_id', '') - exec_time = summary.get('execution_time', 0) - data_volume = summary.get('data_volume', 0) - version = summary.get('latest_version', '') - graph_paths = summary.get('graph_paths', []) - next_refresh_str = summary.get('next_refresh', '') - next_refresh = None + import_name = request_json.get('import_name', '') + status = request_json.get('status', '').removeprefix('ImportStatus.') + job_id = request_json.get('job_id', '') + execution_time = request_json.get('execution_time', 0) + data_volume = request_json.get('data_volume', 0) + latest_version = request_json.get('latest_version', '') + graph_path = request_json.get('graph_path', '') + schedule = request_json.get('schedule', '') + next_refresh_str = request_json.get('next_refresh', '') + next_refresh = datetime.now(timezone.utc) if next_refresh_str: try: next_refresh = datetime.fromisoformat(next_refresh_str) except ValueError: logging.error(f"Error parsing next_refresh: {next_refresh_str}") - + if schedule: + try: + next_refresh = croniter.croniter(schedule, datetime.now( + timezone.utc)).get_next(datetime) + except (croniter.CroniterError) as e: + logging.error( + f"Error calculating next refresh from schedule '{schedule}': {e}" + ) return { 'import_name': import_name, 'status': status, 'job_id': job_id, - 'exec_time': exec_time, + 'execution_time': execution_time, 'data_volume': data_volume, - 'version': version, - 'graph_paths': graph_paths, - 'next_refresh': next_refresh, + 'latest_version': latest_version, + 'graph_path': graph_path, + 'next_refresh': next_refresh } + def get_ingestion_metrics(project_id, location, job_id): """Fetches graph metrics (nodes, edges, observations) and execution time from a Dataflow job. diff --git a/import-automation/workflow/ingestion-helper/main.py b/import-automation/workflow/ingestion-helper/main.py index f3585a32de..39e8615d45 100644 --- a/import-automation/workflow/ingestion-helper/main.py +++ b/import-automation/workflow/ingestion-helper/main.py @@ -126,18 +126,23 @@ def ingestion_helper(request): # importName: name of the import # status: new status # jobId: Dataflow job ID (optional) - # execTime: execution time in seconds (optional) + # executionTime: execution time in seconds (optional) # dataVolume: data volume in bytes (optional) - # version: version string (optional) + # latestVersion: latest version string (optional) + # graphPath: graph path regex (optional) # schedule: cron schedule string (optional) + # nextRefresh: next refresh timestamp (optional) validation_error = _validate_params(request_json, ['importName', 'status']) if validation_error: return (validation_error, 400) - import_name = request_json['importName'] + import_name = request_json['importName'].split(':')[-1] status = request_json['status'] logging.info(f'Updating import {import_name} to status {status}') params = import_utils.get_import_params(request_json) + if status == 'STAGING': + latest_version = request_json['latestVersion'] + storage.update_staging_version(import_name, latest_version) spanner.update_import_status(params) return (f"Updated import {import_name} to status {params['status']}", 200) @@ -154,19 +159,20 @@ def ingestion_helper(request): import_name = request_json['importName'] version = request_json['version'] comment = request_json['comment'] - short_import_name = import_name.split(':')[-1] caller = import_utils.get_caller_identity(request) + if not comment.startswith('import-workflow'): + comment = f'manual-update:{caller} {comment}' logging.info( - f"Import {short_import_name} version {version} caller: {caller} comment: {comment}" - ) + f"Import {import_name} version {version} comment: {comment}") if version == 'staging': version = storage.get_staging_version(import_name) summary = storage.get_import_summary(import_name, version) - params = import_utils.create_import_params(summary) + params = import_utils.get_import_params(summary) + params['import_name'] = import_name.split(':')[-1] params['status'] = 'READY' storage.update_version_file(import_name, version) - spanner.update_version_history(import_name, version, caller, comment) + spanner.update_version_history(import_name, version, comment) spanner.update_import_status(params) - return (f'Updated import {short_import_name} to version {version}', 200) + return (f'Updated import {import_name} to version {version}', 200) else: return (f'Unknown actionType: {actionType}', 400) diff --git a/import-automation/workflow/ingestion-helper/spanner_client.py b/import-automation/workflow/ingestion-helper/spanner_client.py index b1b1783472..55c62f2449 100644 --- a/import-automation/workflow/ingestion-helper/spanner_client.py +++ b/import-automation/workflow/ingestion-helper/spanner_client.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import os from google.cloud import spanner from google.cloud.spanner_v1 import Transaction from google.cloud.spanner_v1.param_types import STRING, TIMESTAMP, Array, INT64 @@ -126,7 +127,7 @@ def _release(transaction: Transaction) -> None: def get_import_list(self, import_list: list) -> list: """Get the list of imports ready to ingest.""" pending_imports = [] - sql = "SELECT ImportName, LatestVersion, GraphDataPaths FROM ImportStatus WHERE State = 'READY'" + sql = "SELECT ImportName, LatestVersion, GraphPath FROM ImportStatus WHERE State = 'READY'" # Use a read-only snapshot for this query try: with self.database.snapshot() as snapshot: @@ -136,7 +137,7 @@ def get_import_list(self, import_list: list) -> list: import_json = {} import_json['importName'] = row[0] import_json['latestVersion'] = row[1] - import_json['graphDataPaths'] = row[2] + import_json['graphPath'] = row[2] pending_imports.append(import_json) logging.info(f"Found {len(pending_imports)} import jobs as READY.") @@ -185,6 +186,25 @@ def _record(transaction: Transaction): transaction.insert_or_update(table="IngestionHistory", columns=columns, values=values) + + # 3. Update ImportVersionHistory table + version_history_columns = [ + "ImportName", "Version", "UpdateTimestamp", "Comment" + ] + version_history_values = [] + for import_json in import_list_json: + version_history_values.append([ + import_json['importName'], + os.path.basename(import_json['latestVersion']), + spanner.COMMIT_TIMESTAMP, + "ingestion-workflow:" + workflow_id + ]) + + if version_history_values: + transaction.insert(table="ImportVersionHistory", + columns=version_history_columns, + values=version_history_values) + logging.info( f"Updated ingestion history table for workflow {workflow_id}") @@ -198,24 +218,25 @@ def update_import_status(self, params: dict): """Updates the status for the specified import job.""" import_name = params['import_name'] job_id = params['job_id'] - exec_time = params['exec_time'] + execution_time = params['execution_time'] data_volume = params['data_volume'] status = params['status'] - version = params['version'] + latest_version = params['latest_version'] next_refresh = params['next_refresh'] - graph_paths = params['graph_paths'] + graph_path = params['graph_path'] logging.info(f"Updating import status for {import_name} to {status}") def _record(transaction: Transaction): columns = [ "ImportName", "State", "JobId", "ExecutionTime", "DataVolume", - "NextRefreshTimestamp", "LatestVersion", "GraphDataPaths", + "NextRefreshTimestamp", "LatestVersion", "GraphPath", "StatusUpdateTimestamp" ] row_values = [ - import_name, status, job_id, exec_time, data_volume, - next_refresh, version, graph_paths, spanner.COMMIT_TIMESTAMP + import_name, status, job_id, execution_time, data_volume, + next_refresh, latest_version, graph_path, + spanner.COMMIT_TIMESTAMP ] if status == 'READY': @@ -236,7 +257,7 @@ def _record(transaction: Transaction): raise def update_version_history(self, import_name: str, version: str, - caller: str, comment: str): + comment: str): """Updates the version history table. Args: @@ -244,15 +265,12 @@ def update_version_history(self, import_name: str, version: str, version: The version string. comment: The comment for the update. """ + import_name = import_name.split(':')[-1] logging.info(f"Updating version history for {import_name} to {version}") def _record(transaction: Transaction): - columns = [ - "ImportName", "Version", "UpdateTimestamp", "Caller", "Comment" - ] - values = [[ - import_name, version, spanner.COMMIT_TIMESTAMP, caller, comment - ]] + columns = ["ImportName", "Version", "UpdateTimestamp", "Comment"] + values = [[import_name, version, spanner.COMMIT_TIMESTAMP, comment]] transaction.insert(table="ImportVersionHistory", columns=columns, values=values) diff --git a/import-automation/workflow/ingestion-helper/storage_client.py b/import-automation/workflow/ingestion-helper/storage_client.py index d43812b7cc..165c3fdf35 100644 --- a/import-automation/workflow/ingestion-helper/storage_client.py +++ b/import-automation/workflow/ingestion-helper/storage_client.py @@ -22,6 +22,9 @@ logging.getLogger().setLevel(logging.INFO) _STAGING_VERSION_FILE = 'staging_version.txt' +_LATEST_VERSION_FILE = 'latest_version.txt' +_IMPORT_METADATA_MCF = 'import_metadata_mcf.mcf' +_IMPORT_SUMMARY_JSON = 'import_summary.json' class StorageClient: @@ -42,7 +45,7 @@ def get_import_summary(self, import_name: str, version: str) -> dict: A dictionary containing the import summary, or an empty dict if not found. """ output_dir = import_name.replace(':', '/') - summary_file = os.path.join(output_dir, version, 'import_summary.json') + summary_file = os.path.join(output_dir, version, _IMPORT_SUMMARY_JSON) logging.info(f'Reading import summary from {summary_file}') try: blob = self.bucket.blob(summary_file) @@ -74,6 +77,22 @@ def get_staging_version(self, import_name: str) -> str: logging.error(f"Version file {version_file} not found") return '' + def update_staging_version(self, import_name: str, version: str): + """Updates the staging version file in GCS. + + Args: + import_name: The name of the import. + version: The new version string. + """ + try: + output_dir = import_name.replace(':', '/') + version_file = self.bucket.blob( + os.path.join(output_dir, _STAGING_VERSION_FILE)) + version_file.upload_from_string(version) + except exceptions.NotFound as e: + logging.error(f'Error updating version file for {import_name}: {e}') + raise + def update_version_file(self, import_name: str, version: str): """Updates the version file in GCS. @@ -87,12 +106,17 @@ def update_version_file(self, import_name: str, version: str): try: output_dir = import_name.replace(':', '/') version_file = self.bucket.blob( - os.path.join(output_dir, 'latest_version.txt')) - self.bucket.copy_blob( - self.bucket.blob( - os.path.join(output_dir, version, - 'import_metadata_mcf.mcf')), self.bucket, - os.path.join(output_dir, 'import_metadata_mcf.mcf')) + os.path.join(output_dir, _LATEST_VERSION_FILE)) + metadata_blob = self.bucket.blob( + os.path.join(output_dir, version, 'import_metadata_mcf.mcf')) + if metadata_blob.exists(): + self.bucket.copy_blob( + metadata_blob, self.bucket, + os.path.join(output_dir, 'import_metadata_mcf.mcf')) + else: + logging.error( + f'Metadata file not found for import {import_name} {version}' + ) version_file.upload_from_string(version) except exceptions.NotFound as e: logging.error(f'Error updating version file for {import_name}: {e}') diff --git a/import-automation/workflow/spanner-ingestion-workflow.yaml b/import-automation/workflow/spanner-ingestion-workflow.yaml index 41b3df79b7..3cfb8c3c23 100644 --- a/import-automation/workflow/spanner-ingestion-workflow.yaml +++ b/import-automation/workflow/spanner-ingestion-workflow.yaml @@ -3,16 +3,18 @@ main: steps: - init: assign: - - lock_timeout: 86400 # 24 hours + - lock_timeout: 82800 # 23 hours - wait_period: 300 # seconds - project_id: '${sys.get_env("PROJECT_ID")}' - dataflow_job_name: 'ingestion-job' - dataflow_gcs_path: 'gs://datcom-templates/templates/flex/ingestion.json' - location: '${sys.get_env("LOCATION")}' + - spanner_project_id: '${sys.get_env("SPANNER_PROJECT_ID")}' + - spanner_instance_id: '${sys.get_env("SPANNER_INSTANCE_ID")}' - spanner_database_id: '${sys.get_env("SPANNER_DATABASE_ID")}' - ingestion_helper: "spanner-ingestion-helper" - function_url: ${"https://" + location + "-" + project_id + ".cloudfunctions.net/" + ingestion_helper} - - import_list: ${default(map.get(args, "imports"), [])} + - import_list: ${default(map.get(args, "importList"), [])} - execution_error: null - acquire_ingestion_lock: call: http.post @@ -46,6 +48,8 @@ main: job_name: ${dataflow_job_name} template_gcs_path: ${dataflow_gcs_path} location: ${location} + spanner_project_id: ${spanner_project_id} + spanner_instance_id: ${spanner_instance_id} spanner_database_id: ${spanner_database_id} wait_period: ${wait_period} result: dataflow_job_id @@ -86,7 +90,7 @@ main: # This sub-workflow launches a Dataflow job and waits for it to complete. run_dataflow_job: - params: [import_list, project_id, job_name, template_gcs_path, location, spanner_database_id, wait_period] + params: [import_list, project_id, job_name, template_gcs_path, location, spanner_project_id, spanner_instance_id, spanner_database_id, wait_period] steps: - init: assign: @@ -111,6 +115,8 @@ run_dataflow_job: jobName: '${jobName}' parameters: importList: '${import_list}' + projectId: '${spanner_project_id}' + spannerInstanceId: '${spanner_instance_id}' spannerDatabaseId: '${spanner_database_id}' environment: numWorkers: 3