-
Notifications
You must be signed in to change notification settings - Fork 145
[DCP - Ingestion Helper] Major code refactor for modularization and cleanliness #2036
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
gmechali
wants to merge
3
commits into
datacommonsorg:master
Choose a base branch
from
gmechali:refactor
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
1 change: 1 addition & 0 deletions
1
import-automation/workflow/ingestion-helper/handlers/__init__.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| # Initialize handlers package |
40 changes: 40 additions & 0 deletions
40
import-automation/workflow/ingestion-helper/handlers/aggregation.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| import logging | ||
| import os | ||
| from absl import flags | ||
| from .helpers.aggregation_utils import AggregationUtils | ||
|
|
||
| FLAGS = flags.FLAGS | ||
|
|
||
| def handle_run_aggregation(request_json): | ||
| """Runs aggregation logic for the specified imports.""" | ||
| import_list = request_json.get('importList', []) | ||
|
|
||
| # Validate required flags are not empty or None | ||
| missing_flags = [] | ||
| if not FLAGS.spanner_connection_id: | ||
| missing_flags.append('spanner_connection_id (BQ_SPANNER_CONN_ID)') | ||
| if not FLAGS.spanner_project_id: | ||
| missing_flags.append('spanner_project_id (SPANNER_PROJECT_ID)') | ||
| if not FLAGS.spanner_instance_id: | ||
| missing_flags.append('spanner_instance_id (SPANNER_INSTANCE_ID)') | ||
| if not FLAGS.spanner_graph_database_id: | ||
| missing_flags.append('spanner_graph_database_id (SPANNER_GRAPH_DATABASE_ID)') | ||
|
|
||
| if missing_flags: | ||
| error_msg = f"Missing required configuration flags/env-vars: {', '.join(missing_flags)}" | ||
| logging.error(error_msg) | ||
| return (error_msg, 400) | ||
|
|
||
| aggregation = AggregationUtils( | ||
| connection_id=FLAGS.spanner_connection_id, | ||
| project_id=FLAGS.spanner_project_id, | ||
| instance_id=FLAGS.spanner_instance_id, | ||
| database_id=FLAGS.spanner_graph_database_id, | ||
| ) | ||
| try: | ||
| if aggregation.run_aggregation(import_list): | ||
| return ('OK', 200) | ||
| else: | ||
| return ('Aggregation failed', 500) | ||
| except Exception as e: | ||
| return (f"Aggregation failed: {str(e)}", 500) | ||
22 changes: 22 additions & 0 deletions
22
import-automation/workflow/ingestion-helper/handlers/cache.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| import logging | ||
| import os | ||
| from flask import jsonify | ||
| import redis | ||
|
|
||
| def handle_clear_redis_cache(request_json): | ||
| """Flushes the Redis cache.""" | ||
| logging.info("Action: clear_redis_cache") | ||
| redis_host = os.environ.get("REDIS_HOST") | ||
| redis_port = os.environ.get("REDIS_PORT", "6379") | ||
| if redis_host: | ||
| try: | ||
| r = redis.Redis(host=redis_host, port=int(redis_port)) | ||
| r.flushall(asynchronous=True) | ||
| logging.info(f"Redis cache at {redis_host}:{redis_port} flushed successfully (async).") | ||
| return jsonify({'status': 'SUCCESS', 'message': 'Cache cleared'}), 200 | ||
| except Exception as e: | ||
| logging.error(f"Failed to flush Redis cache: {e}") | ||
| return jsonify({'status': 'ERROR', 'message': str(e)}), 500 | ||
Check warningCode scanning / CodeQL Information exposure through an exception Medium Stack trace information Error loading related location Loading |
||
|
|
||
| else: | ||
| logging.warning("REDIS_HOST not set, skipping cache flush.") | ||
| return jsonify({'status': 'SKIPPED', 'message': 'REDIS_HOST not set'}), 200 | ||
18 changes: 18 additions & 0 deletions
18
import-automation/workflow/ingestion-helper/handlers/database.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| import logging | ||
| from absl import flags | ||
|
|
||
| FLAGS = flags.FLAGS | ||
|
|
||
| def handle_initialize_database(spanner, request_json): | ||
| """Initializes the database by creating all required tables and proto bundles.""" | ||
| logging.info("Action: initialize_database") | ||
| enable_embeddings = request_json.get('enableEmbeddings', | ||
| FLAGS.enable_embeddings) | ||
| spanner.initialize_database(enable_embeddings=enable_embeddings) | ||
| return ('OK', 200) | ||
|
|
||
| def handle_seed_database(spanner): | ||
| """Seeds the database with base empty nodes.""" | ||
| logging.info("Action: seed_database") | ||
| spanner.seed_database() | ||
| return ('OK', 200) |
26 changes: 26 additions & 0 deletions
26
import-automation/workflow/ingestion-helper/handlers/embeddings.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| import logging | ||
| from absl import flags | ||
| from .helpers.embedding_utils import get_latest_lock_timestamp, get_updated_nodes, filter_and_convert_nodes, generate_embeddings_partitioned | ||
|
|
||
| FLAGS = flags.FLAGS | ||
|
|
||
| def handle_embedding_ingestion(spanner, request_json): | ||
| """Handles embedding ingestion.""" | ||
| logging.info("Action: embedding_ingestion") | ||
| enable_embeddings = request_json.get('enableEmbeddings', | ||
| FLAGS.enable_embeddings) | ||
| if not enable_embeddings: | ||
| logging.info("Embeddings not enabled, skipping.") | ||
| return ('Invalid request on embedding ingestion.', 400) | ||
|
|
||
| node_types = FLAGS.node_types | ||
| try: | ||
| logging.info(f"Job started. Fetching all nodes for types: {node_types}") | ||
| timestamp = get_latest_lock_timestamp(spanner.database) | ||
| nodes = get_updated_nodes(spanner.database, timestamp, node_types) | ||
| converted_nodes = filter_and_convert_nodes(nodes) | ||
| affected_rows = generate_embeddings_partitioned(spanner.database, converted_nodes) | ||
| return (f"OK [Affected rows: {affected_rows}]", 200) | ||
| except Exception as e: | ||
| logging.error(f"Embedding ingestion failed: {e}") | ||
| return (f"Error: {e}", 500) |
1 change: 1 addition & 0 deletions
1
import-automation/workflow/ingestion-helper/handlers/helpers/__init__.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| # Initialize helpers package |
File renamed without changes.
File renamed without changes.
File renamed without changes.
99 changes: 99 additions & 0 deletions
99
import-automation/workflow/ingestion-helper/handlers/imports.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| import logging | ||
| import os | ||
| from flask import jsonify | ||
| from absl import flags | ||
| from .helpers import import_utils | ||
| from .utils import validate_params | ||
|
|
||
| FLAGS = flags.FLAGS | ||
|
|
||
| def handle_get_import_info(spanner, request_json): | ||
| """Gets the details of imports that are ready for ingestion.""" | ||
| import_list = request_json.get('importList', []) | ||
| import_info = spanner.get_import_info(import_list) | ||
| return jsonify(import_info) | ||
|
|
||
| def handle_update_ingestion_status(spanner, request_json): | ||
| """Updates the status of imports after ingestion.""" | ||
| validation_error = validate_params( | ||
| request_json, ['importList', 'workflowId', 'jobId', 'status']) | ||
| if validation_error: | ||
| return (validation_error, 400) | ||
|
|
||
| import_list = request_json['importList'] | ||
| workflow_id = request_json['workflowId'] | ||
| status = request_json['status'] | ||
| job_id = request_json['jobId'] | ||
| ingested_imports = [item['importName'] for item in import_list] | ||
|
|
||
| spanner.update_ingestion_status(ingested_imports, workflow_id, status) | ||
| metrics = import_utils.get_ingestion_metrics(FLAGS.project_id, | ||
| FLAGS.location, job_id) | ||
| spanner.update_ingestion_history(workflow_id, job_id, ingested_imports, | ||
| metrics) | ||
| if status == 'SUCCESS': | ||
| spanner.update_import_version_history(import_list, workflow_id) | ||
| return ('OK', 200) | ||
|
|
||
| def handle_update_import_status(spanner, storage, request_json): | ||
| """Updates the status of a specific import job.""" | ||
| validation_error = validate_params(request_json, | ||
| ['importName', 'status']) | ||
| if validation_error: | ||
| return (validation_error, 400) | ||
|
|
||
| import_name = request_json['importName'] | ||
| status = request_json['status'] | ||
| logging.info(f'Updating import {import_name} to status {status}') | ||
| params = import_utils.get_import_params(request_json) | ||
| next_refresh = import_utils.get_next_refresh(FLAGS.project_id, | ||
| FLAGS.location, | ||
| import_name) | ||
| if next_refresh: | ||
| params['next_refresh'] = next_refresh | ||
| if status == 'STAGING': | ||
| version = os.path.basename(request_json.get('latestVersion', '')) | ||
| if not version: | ||
| return (f'Empty version for import {import_name}', 500) | ||
| storage.update_version_file(import_name, version, is_staging=True) | ||
| storage.update_provenance_file(import_name, version) | ||
| storage.update_import_summary(params) | ||
| storage.update_version_file(import_name, version, is_staging=False) | ||
| comment = f"import-workflow:{request_json.get('jobId','')}" | ||
| spanner.update_version_history(import_name, version, comment) | ||
| spanner.update_import_status(params) | ||
| return ('OK', 200) | ||
|
|
||
| def handle_update_import_version(spanner, storage, request, request_json): | ||
| """Updates the version and status of an import.""" | ||
| validation_error = validate_params( | ||
| request_json, ['importName', 'version', 'comment']) | ||
| if validation_error: | ||
| return (validation_error, 400) | ||
|
|
||
| import_name = request_json['importName'] | ||
| version = request_json['version'] | ||
| comment = request_json['comment'] | ||
| logging.info( | ||
| f"Updating import {import_name} to version {version} comment:{comment}" | ||
| ) | ||
| override = request_json.get('override', False) | ||
| if version == 'STAGING': | ||
| version = storage.get_staging_version(import_name) | ||
| summary = storage.get_import_summary(import_name, version) | ||
| params = import_utils.get_import_params(summary) | ||
| if override: | ||
| params['status'] = 'STAGING' | ||
| caller = import_utils.get_caller_identity(request) | ||
| comment = f'version-override:{caller} {comment}' | ||
| if params['status'] == 'STAGING': | ||
| storage.update_provenance_file(import_name, version) | ||
| storage.update_version_file(import_name, version, is_staging=False) | ||
| spanner.update_version_history(import_name, version, comment) | ||
| logging.info(f"Updated import {import_name} to version {version}") | ||
| else: | ||
| logging.info(f"Skipping {import_name} version update") | ||
| spanner.update_import_status(params) | ||
| return ( | ||
| f"OK [Import: {import_name} Version: {version} Status: {params['status']}]", | ||
| 200) |
27 changes: 27 additions & 0 deletions
27
import-automation/workflow/ingestion-helper/handlers/lock.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| import logging | ||
| from .utils import validate_params | ||
|
|
||
| def handle_acquire_lock(spanner, request_json): | ||
| """Attempts to acquire the global lock for ingestion.""" | ||
| validation_error = validate_params(request_json, ['workflowId', 'timeout']) | ||
| if validation_error: | ||
| return (validation_error, 400) | ||
|
|
||
| workflow = request_json['workflowId'] | ||
| timeout = request_json['timeout'] | ||
| status = spanner.acquire_lock(workflow, timeout) | ||
| if not status: | ||
| return ('Failed to acquire lock', 500) | ||
| return ('OK', 200) | ||
|
|
||
| def handle_release_lock(spanner, request_json): | ||
| """Releases the global ingestion lock.""" | ||
| validation_error = validate_params(request_json, ['workflowId']) | ||
| if validation_error: | ||
| return (validation_error, 400) | ||
|
|
||
| workflow = request_json['workflowId'] | ||
| status = spanner.release_lock(workflow) | ||
| if not status: | ||
| return ('Failed to release lock', 500) | ||
| return ('OK', 200) |
6 changes: 6 additions & 0 deletions
6
import-automation/workflow/ingestion-helper/handlers/utils.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| def validate_params(request_json, required_params): | ||
| """Validates that required parameters are present in the request JSON.""" | ||
| for param in required_params: | ||
| if param not in request_json: | ||
| return f"'{param}' parameter is missing" | ||
| return None |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
AggregationUtilsconstructor should be updated to include thelocationandis_base_dcparameters. This ensures that environment-specific configurations for theBigQueryExecutorandLinkedEdgeGeneratorare handled within the class constructor, promoting modularity and preventing incorrect defaults.References