From 097f02cdd3dd7ecea3ab64a7f08e4f1fec381253 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Wed, 27 May 2026 07:43:46 -0400 Subject: [PATCH 1/2] Revert "fix: Skip "dc/base/" prefix for custom data instances (#2025)" This reverts commit 83f030c3dea39b54714b81c9a23e3bdb6365f4db. --- .../ingestion-helper/aggregation_utils.py | 44 ++++++------------- .../workflow/ingestion-helper/main.py | 5 --- 2 files changed, 13 insertions(+), 36 deletions(-) diff --git a/import-automation/workflow/ingestion-helper/aggregation_utils.py b/import-automation/workflow/ingestion-helper/aggregation_utils.py index 0993ac32b8..c26a280125 100644 --- a/import-automation/workflow/ingestion-helper/aggregation_utils.py +++ b/import-automation/workflow/ingestion-helper/aggregation_utils.py @@ -66,9 +66,8 @@ def execute(self, query: str, job_config: Optional[bigquery.QueryJobConfig] = No class LinkedEdgeGenerator: """Generates and ingests linked relationship edges (e.g., transitive closures) into Spanner for faster lookup.""" - def __init__(self, executor: BigQueryExecutor, is_base_dc: bool = True) -> None: + def __init__(self, executor: BigQueryExecutor) -> None: self.executor = executor - self.is_base_dc = is_base_dc def run_all(self, import_names: List[str] = None) -> None: """Runs all global aggregations in sequence.""" @@ -90,12 +89,8 @@ def run_linked_contained_in_place(self, import_names: List[str] = None) -> None: return dest = self.executor.get_spanner_destination_uri() - # Escape single quotes to prevent SQL injection - safe_names = [name.replace("'", "''") for name in import_names] - prefix = "dc/base/" if self.is_base_dc else "" - provenances = [f"'{prefix}{name}'" for name in safe_names] + provenances = [f"'dc/base/{name}'" for name in import_names] provenance_filter = f" AND provenance IN ({', '.join(provenances)})" - gen_graphs_prov = 'dc/base/GeneratedGraphs' if self.is_base_dc else 'GeneratedGraphs' query = f""" -- Pull base edges needed for containedInPlace aggregation @@ -142,7 +137,7 @@ def run_linked_contained_in_place(self, import_names: List[str] = None) -> None: subject_id, 'linkedContainedInPlace' as predicate, ancestor_place as object_id, - '{gen_graphs_prov}' as provenance + 'dc/base/GeneratedGraphs' as provenance FROM Ancestors ), @@ -179,12 +174,8 @@ def run_linked_member_of(self, import_names: List[str] = None) -> None: return dest = self.executor.get_spanner_destination_uri() - # Escape single quotes to prevent SQL injection - safe_names = [name.replace("'", "''") for name in import_names] - prefix = "dc/base/" if self.is_base_dc else "" - provenances = [f"'{prefix}{name}'" for name in safe_names] + provenances = [f"'dc/base/{name}'" for name in import_names] provenance_filter = f" AND provenance IN ({', '.join(provenances)})" - gen_graphs_prov = 'dc/base/GeneratedGraphs' if self.is_base_dc else 'GeneratedGraphs' query = f""" -- Pull base edges needed for memberOf aggregation @@ -234,7 +225,7 @@ def run_linked_member_of(self, import_names: List[str] = None) -> None: subject_id, 'linkedMemberOf' as predicate, ancestor as object_id, - '{gen_graphs_prov}' as provenance + 'dc/base/GeneratedGraphs' as provenance FROM Ancestors ), @@ -271,12 +262,8 @@ def run_linked_member(self, import_names: List[str] = None) -> None: return dest = self.executor.get_spanner_destination_uri() - # Escape single quotes to prevent SQL injection - safe_names = [name.replace("'", "''") for name in import_names] - prefix = "dc/base/" if self.is_base_dc else "" - provenances = [f"'{prefix}{name}'" for name in safe_names] + provenances = [f"'dc/base/{name}'" for name in import_names] provenance_filter = f" AND provenance IN ({', '.join(provenances)})" - gen_graphs_prov = 'dc/base/GeneratedGraphs' if self.is_base_dc else 'GeneratedGraphs' query = f""" -- Pull base edges needed for member aggregation @@ -324,7 +311,7 @@ def run_linked_member(self, import_names: List[str] = None) -> None: descendant as subject_id, 'linkedMember' as predicate, subject_id as object_id, - '{gen_graphs_prov}' as provenance + 'dc/base/GeneratedGraphs' as provenance FROM Descendants WHERE subject_id LIKE 'dc/topic%' @@ -361,9 +348,8 @@ def run_linked_member(self, import_names: List[str] = None) -> None: class ProvenanceSummaryGenerator: """Contains the SQL queries to generate ProvenanceSummary in the Cache table.""" - def __init__(self, executor: BigQueryExecutor, is_base_dc: bool = True) -> None: + def __init__(self, executor: BigQueryExecutor) -> None: self.executor = executor - self.is_base_dc = is_base_dc def run_all(self, import_names: List[str]) -> None: """Runs all provenance summary generation in sequence.""" @@ -382,11 +368,8 @@ def run_provenance_summary_aggregation(self, import_names: List[str]) -> None: dest = self.executor.get_spanner_destination_uri() connection_id = self.executor.connection_id - # Escape single quotes to prevent SQL injection - safe_names = [name.replace("'", "''") for name in import_names] # Format import names for the SQL IN clause - imports_str = ", ".join([f"'{name}'" for name in safe_names]) - provenance_dcid_expr = "CONCAT('dc/base/', raw.import_name)" if self.is_base_dc else "raw.import_name" + imports_str = ", ".join([f"'{name}'" for name in import_names]) query = f""" -- Step 1: Fetch Observation rows for the specific import @@ -450,7 +433,7 @@ def run_provenance_summary_aggregation(self, import_names: List[str]) -> None: raw.is_dc_aggregate, JSON_VALUE(v, '$.key') as date_val, SAFE_CAST(JSON_VALUE(v, '$.value') AS FLOAT64) as value_num, - {provenance_dcid_expr} as provenance_dcid, + CONCAT('dc/base/', raw.import_name) as provenance_dcid, nodes.name as place_name, edges.place_type FROM `temp_obs_raw` raw @@ -581,8 +564,7 @@ def __init__(self, project_id: str, instance_id: str, database_id: str, - location: Optional[str] = None, - is_base_dc: bool = True) -> None: + location: Optional[str] = None) -> None: self.executor = BigQueryExecutor( connection_id=connection_id, project_id=project_id, @@ -590,8 +572,8 @@ def __init__(self, database_id=database_id, location=location ) - self.linked_edge_generator = LinkedEdgeGenerator(self.executor, is_base_dc) - self.provenance_summary_generator = ProvenanceSummaryGenerator(self.executor, is_base_dc) + self.linked_edge_generator = LinkedEdgeGenerator(self.executor) + self.provenance_summary_generator = ProvenanceSummaryGenerator(self.executor) def run_aggregation(self, import_list: List[Dict[str, Any]]) -> bool: """ diff --git a/import-automation/workflow/ingestion-helper/main.py b/import-automation/workflow/ingestion-helper/main.py index 511d9e7108..f9024920bb 100644 --- a/import-automation/workflow/ingestion-helper/main.py +++ b/import-automation/workflow/ingestion-helper/main.py @@ -41,10 +41,6 @@ flags.DEFINE_list( 'node_types', ['StatisticalVariable', 'Topic'], 'Node types to generate embeddings for') -flags.DEFINE_bool( - 'is_base_dc', - os.environ.get('IS_BASE_DC', 'true').lower() == 'true', - 'Is base DC') if not FLAGS.is_parsed(): FLAGS(['ingestion_helper']) @@ -279,7 +275,6 @@ def ingestion_helper(request): instance_id=FLAGS.spanner_instance_id, database_id=FLAGS.spanner_graph_database_id, location=FLAGS.location, - is_base_dc=FLAGS.is_base_dc, ) try: if aggregation.run_aggregation(import_list): From 12e4ee3a7e122454f4454404304ef62dd347eadb Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Wed, 27 May 2026 17:20:30 -0400 Subject: [PATCH 2/2] Just do a partial revert --- .../ingestion-helper/aggregation_utils.py | 15 +++++++++------ .../workflow/ingestion-helper/main.py | 5 +++++ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/import-automation/workflow/ingestion-helper/aggregation_utils.py b/import-automation/workflow/ingestion-helper/aggregation_utils.py index c26a280125..dbb9632dcb 100644 --- a/import-automation/workflow/ingestion-helper/aggregation_utils.py +++ b/import-automation/workflow/ingestion-helper/aggregation_utils.py @@ -66,8 +66,9 @@ def execute(self, query: str, job_config: Optional[bigquery.QueryJobConfig] = No class LinkedEdgeGenerator: """Generates and ingests linked relationship edges (e.g., transitive closures) into Spanner for faster lookup.""" - def __init__(self, executor: BigQueryExecutor) -> None: + def __init__(self, executor: BigQueryExecutor, is_base_dc: bool = True) -> None: self.executor = executor + self.is_base_dc = is_base_dc def run_all(self, import_names: List[str] = None) -> None: """Runs all global aggregations in sequence.""" @@ -348,8 +349,9 @@ def run_linked_member(self, import_names: List[str] = None) -> None: class ProvenanceSummaryGenerator: """Contains the SQL queries to generate ProvenanceSummary in the Cache table.""" - def __init__(self, executor: BigQueryExecutor) -> None: + def __init__(self, executor: BigQueryExecutor, is_base_dc: bool = True) -> None: self.executor = executor + self.is_base_dc = is_base_dc def run_all(self, import_names: List[str]) -> None: """Runs all provenance summary generation in sequence.""" @@ -564,16 +566,17 @@ def __init__(self, project_id: str, instance_id: str, database_id: str, - location: Optional[str] = None) -> None: + location: Optional[str] = None, + is_base_dc: bool = True) -> None: self.executor = BigQueryExecutor( connection_id=connection_id, project_id=project_id, instance_id=instance_id, database_id=database_id, - location=location + location=location, ) - self.linked_edge_generator = LinkedEdgeGenerator(self.executor) - self.provenance_summary_generator = ProvenanceSummaryGenerator(self.executor) + self.linked_edge_generator = LinkedEdgeGenerator(self.executor, is_base_dc) + self.provenance_summary_generator = ProvenanceSummaryGenerator(self.executor, is_base_dc) def run_aggregation(self, import_list: List[Dict[str, Any]]) -> bool: """ diff --git a/import-automation/workflow/ingestion-helper/main.py b/import-automation/workflow/ingestion-helper/main.py index f9024920bb..511d9e7108 100644 --- a/import-automation/workflow/ingestion-helper/main.py +++ b/import-automation/workflow/ingestion-helper/main.py @@ -41,6 +41,10 @@ flags.DEFINE_list( 'node_types', ['StatisticalVariable', 'Topic'], 'Node types to generate embeddings for') +flags.DEFINE_bool( + 'is_base_dc', + os.environ.get('IS_BASE_DC', 'true').lower() == 'true', + 'Is base DC') if not FLAGS.is_parsed(): FLAGS(['ingestion_helper']) @@ -275,6 +279,7 @@ def ingestion_helper(request): instance_id=FLAGS.spanner_instance_id, database_id=FLAGS.spanner_graph_database_id, location=FLAGS.location, + is_base_dc=FLAGS.is_base_dc, ) try: if aggregation.run_aggregation(import_list):