From c641f9fb405298ad06bcb71f3d7b0e9cc82b0600 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Tue, 30 Dec 2025 19:53:11 +0530 Subject: [PATCH 1/5] Support 0 eval results --- tools/statvar_importer/property_value_mapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/statvar_importer/property_value_mapper.py b/tools/statvar_importer/property_value_mapper.py index 76d5209216..8c60f481a3 100644 --- a/tools/statvar_importer/property_value_mapper.py +++ b/tools/statvar_importer/property_value_mapper.py @@ -347,7 +347,7 @@ def _process_eval(self, pvs: dict, data_key: str) -> bool: self._log_every_n) if not eval_prop: eval_prop = data_key - if eval_data and eval_data != eval_str: + if eval_data is not None and eval_data != eval_str: pvs[eval_prop] = eval_data self._counters.add_counter('processed-eval', 1, eval_str) pvs.pop(eval_key) From f30e89550743d537d0aed8f59e3031cd2452cf19 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Wed, 25 Mar 2026 16:32:16 +0530 Subject: [PATCH 2/5] Use environment variable for DC API root --- util/dc_api_wrapper.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/util/dc_api_wrapper.py b/util/dc_api_wrapper.py index f1a1fee247..98f7209d0e 100644 --- a/util/dc_api_wrapper.py +++ b/util/dc_api_wrapper.py @@ -15,7 +15,7 @@ It uses the DataCommonsClient library module for DC APIs and adds support for batched requests, retries and HTTP caching. -DC API requires an environment variable set for DC_API_KEY. +DC API requires an environment variable set for DC_API_KEY and DC_API_ROOT. Please refer to https://docs.datacommons.org/api/python/v2 for more details. """ @@ -265,7 +265,7 @@ def get_datacommons_client(config: dict = None) -> DataCommonsClient: """Returns a DataCommonsClient object initialized using config.""" config = _validate_v2_config(config) api_key = get_dc_api_key(config) - dc_instance = config.get('dc_api_root') + dc_instance = config.get('dc_api_root', os.environ.get('DC_API_ROOT')) url = None # Check if API root is a host or url endpoint. if dc_instance: From 2a0e93d8f73aeef6ff2b9b48bf9b5b819ed99182 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Wed, 25 Mar 2026 22:17:08 +0530 Subject: [PATCH 3/5] cleanup dc_api_root configs --- scripts/earthengine/utils.py | 2 -- scripts/earthengine/utils_test.py | 1 - .../common/datacommons_api_wrappers/datacommons_wrappers.py | 3 --- .../datacommons_api_wrappers/datacommons_wrappers_test.py | 2 -- .../india_rbi_state_statistics/environment_sdg_metadata.csv | 1 - .../india_rbi_state_statistics/infrastructure_metadata.csv | 1 - .../india_rbi_state_statistics/rbi_metadata.csv | 5 ----- .../denmark_demographics/denmark_demographics_metadata.csv | 1 - .../fema/flood_insurance_claims/us_flood_nfip_config.py | 3 --- .../state_domestic_product_metadata.csv | 1 - .../statistics_poland/StatisticsPoland_metadata.csv | 3 --- .../ap_ib_gt_enrollment/config/common_metadata.csv | 1 - .../state/config/SATorACT_Participation_metadata.csv | 1 - tools/statvar_importer/config_flags.py | 2 +- util/dc_api_wrapper.py | 3 ++- 15 files changed, 3 insertions(+), 27 deletions(-) diff --git a/scripts/earthengine/utils.py b/scripts/earthengine/utils.py index 44dbd71535..7e59202eaa 100644 --- a/scripts/earthengine/utils.py +++ b/scripts/earthengine/utils.py @@ -46,7 +46,6 @@ # Constants _MAX_LATITUDE = 90.0 _MAX_LONGITUDE = 180.0 -_DC_API_ROOT = 'https://api.datacommons.org' # Utilities for dicts. @@ -372,7 +371,6 @@ def place_id_to_lat_lng(placeid: str, { 'dc_api_version': 'V2', 'dc_api_use_cache': True, - 'dc_api_root': _DC_API_ROOT, }, ) node_props = resp.get(placeid) if resp else None diff --git a/scripts/earthengine/utils_test.py b/scripts/earthengine/utils_test.py index bfdd347661..f0f8e567cd 100644 --- a/scripts/earthengine/utils_test.py +++ b/scripts/earthengine/utils_test.py @@ -394,5 +394,4 @@ def test_place_id_to_lat_lng_dc_api(self): [placeid], ['latitude', 'longitude'], { 'dc_api_version': 'V2', 'dc_api_use_cache': True, - 'dc_api_root': utils._DC_API_ROOT, }) diff --git a/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers.py b/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers.py index 39c93bc599..70e0936ee5 100644 --- a/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers.py +++ b/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers.py @@ -64,9 +64,6 @@ def dc_check_existence(dcid_list: list, wrapper_config = { 'dc_api_batch_size': max_items, - 'dc_api_root': - 'https://autopush.api.datacommons.org' - if use_autopush else 'https://api.datacommons.org' } return dc_api_is_defined_dcid(dcid_list, wrapper_config) diff --git a/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers_test.py b/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers_test.py index 5eb9d2a497..e0374c7010 100644 --- a/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers_test.py +++ b/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers_test.py @@ -37,14 +37,12 @@ def test_dc_check_existence_mock(self, mock_is_defined): mock_is_defined.assert_called_with( ['node1'], { 'dc_api_batch_size': 450, - 'dc_api_root': 'https://autopush.api.datacommons.org' }) # Test 2: use_autopush=False dc_check_existence(['node2'], use_autopush=False, max_items=10) mock_is_defined.assert_called_with(['node2'], { 'dc_api_batch_size': 10, - 'dc_api_root': 'https://api.datacommons.org' }) @mock.patch('datacommons_wrappers.request_post_json') diff --git a/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/environment_sdg_metadata.csv b/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/environment_sdg_metadata.csv index 782aa4c330..f11ac0b3c4 100644 --- a/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/environment_sdg_metadata.csv +++ b/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/environment_sdg_metadata.csv @@ -2,4 +2,3 @@ parameter,value header_rows,3 output_columns,"observationAbout,observationDate,variableMeasured,value,unit,observationPeriod" mapped_rows,3 -dc_api_root,https://api.datacommons.org diff --git a/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/infrastructure_metadata.csv b/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/infrastructure_metadata.csv index 475c900919..f5c45d8c4e 100644 --- a/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/infrastructure_metadata.csv +++ b/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/infrastructure_metadata.csv @@ -2,4 +2,3 @@ parameter,value header_rows,5 output_columns,"observationAbout,observationDate,variableMeasured,value,unit,observationPeriod" mapped_rows,5 -dc_api_root,https://api.datacommons.org diff --git a/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/rbi_metadata.csv b/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/rbi_metadata.csv index ad0d50f768..c2042f4fd4 100644 --- a/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/rbi_metadata.csv +++ b/statvar_imports/database_on_indian_economy/india_rbi_state_statistics/rbi_metadata.csv @@ -2,8 +2,3 @@ parameter,value output_columns,"observationAbout,observationDate,variableMeasured,value,unit,observationPeriod" header_rows,4 mapped_rows,4 -dc_api_root,https://api.datacommons.org - - - - diff --git a/statvar_imports/denmark_demographics/denmark_demographics_metadata.csv b/statvar_imports/denmark_demographics/denmark_demographics_metadata.csv index 41f8f31e37..95d252a541 100644 --- a/statvar_imports/denmark_demographics/denmark_demographics_metadata.csv +++ b/statvar_imports/denmark_demographics/denmark_demographics_metadata.csv @@ -1,3 +1,2 @@ parameter,value output_columns,"observationDate,value,observationAbout,variableMeasured" -dc_api_root,https://api.datacommons.org diff --git a/statvar_imports/fema/flood_insurance_claims/us_flood_nfip_config.py b/statvar_imports/fema/flood_insurance_claims/us_flood_nfip_config.py index 90e53db883..082ce8b7e3 100644 --- a/statvar_imports/fema/flood_insurance_claims/us_flood_nfip_config.py +++ b/statvar_imports/fema/flood_insurance_claims/us_flood_nfip_config.py @@ -68,7 +68,4 @@ 5, 'dc_api_use_cache': True, - #'dc_api_root': 'http://autopush.api.datacommons.org', - 'dc_api_root': - 'http://api.datacommons.org', } diff --git a/statvar_imports/india_rbistatedomesticproduct/state_domestic_product_metadata.csv b/statvar_imports/india_rbistatedomesticproduct/state_domestic_product_metadata.csv index ee630bdff4..0c90bd2702 100644 --- a/statvar_imports/india_rbistatedomesticproduct/state_domestic_product_metadata.csv +++ b/statvar_imports/india_rbistatedomesticproduct/state_domestic_product_metadata.csv @@ -11,4 +11,3 @@ comments, output_columns,"observationAbout,observationDate,variableMeasured,value,unit,measurementMethod,observationPeriod" #header_rows,6 #mapped_rows,5 -dc_api_root,https://api.datacommons.org diff --git a/statvar_imports/statistics_poland/StatisticsPoland_metadata.csv b/statvar_imports/statistics_poland/StatisticsPoland_metadata.csv index b909a13a08..a3a30ec1c2 100644 --- a/statvar_imports/statistics_poland/StatisticsPoland_metadata.csv +++ b/statvar_imports/statistics_poland/StatisticsPoland_metadata.csv @@ -9,6 +9,3 @@ places_within,country/POL #skip_rows,1 header_rows,5 mapped_columns,2 -dc_api_root,https://api.datacommons.org - - diff --git a/statvar_imports/us_urban_school/ap_ib_gt_enrollment/config/common_metadata.csv b/statvar_imports/us_urban_school/ap_ib_gt_enrollment/config/common_metadata.csv index 41a321a836..2c8f80a15c 100644 --- a/statvar_imports/us_urban_school/ap_ib_gt_enrollment/config/common_metadata.csv +++ b/statvar_imports/us_urban_school/ap_ib_gt_enrollment/config/common_metadata.csv @@ -3,4 +3,3 @@ mapped_rows,1 output_columns,"observationDate,observationAbout,variableMeasured,value" #input_rows,10 mapped_columns,2 -dc_api_root,https://api.datacommons.org diff --git a/statvar_imports/us_urban_school/sat_act_participation/state/config/SATorACT_Participation_metadata.csv b/statvar_imports/us_urban_school/sat_act_participation/state/config/SATorACT_Participation_metadata.csv index 4909fa4a53..75997951a4 100644 --- a/statvar_imports/us_urban_school/sat_act_participation/state/config/SATorACT_Participation_metadata.csv +++ b/statvar_imports/us_urban_school/sat_act_participation/state/config/SATorACT_Participation_metadata.csv @@ -1,3 +1,2 @@ parameter,value output_columns,"observationAbout,observationDate,value,variableMeasured,unit,scalingFactor" -dc_api_root,https://api.datacommons.org diff --git a/tools/statvar_importer/config_flags.py b/tools/statvar_importer/config_flags.py index d5214a3510..94a162f33c 100644 --- a/tools/statvar_importer/config_flags.py +++ b/tools/statvar_importer/config_flags.py @@ -370,7 +370,7 @@ def get_default_config() -> dict: True, # Settings for DC API. 'dc_api_root': - 'http://api.datacommons.org', + os.environ.get('DC_API_ROOT', 'http://api.datacommons.org'), 'dc_api_use_cache': False, 'dc_api_batch_size': diff --git a/util/dc_api_wrapper.py b/util/dc_api_wrapper.py index 98f7209d0e..682d4aeeaf 100644 --- a/util/dc_api_wrapper.py +++ b/util/dc_api_wrapper.py @@ -520,7 +520,8 @@ def dc_api_resolve_latlng(lat_lngs: list, dictionary containing the resolved place information. """ config = _validate_v2_config(config) - api_root = config.get('dc_api_root', _DEFAULT_API_ROOT) + api_root = config.get('dc_api_root', + os.environ.get('DC_API_ROOT', _DEFAULT_API_ROOT)) v1_data = {} v1_data['coordinates'] = lat_lngs num_ids = len(lat_lngs) From dfad7506200cbf30ee076a9e258e975460c965a2 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Wed, 25 Mar 2026 23:12:48 +0530 Subject: [PATCH 4/5] lint fix --- scripts/earthengine/utils.py | 12 +-- scripts/earthengine/utils_test.py | 10 +- .../datacommons_wrappers.py | 3 +- .../datacommons_wrappers_test.py | 7 +- tools/statvar_importer/config_flags.py | 2 +- util/dc_api_wrapper.py | 100 +++++++++++------- 6 files changed, 77 insertions(+), 57 deletions(-) diff --git a/scripts/earthengine/utils.py b/scripts/earthengine/utils.py index 7e59202eaa..aeb94045ac 100644 --- a/scripts/earthengine/utils.py +++ b/scripts/earthengine/utils.py @@ -19,11 +19,11 @@ from datetime import datetime import glob import os +from pathlib import Path import pickle import re import sys import tempfile -from pathlib import Path from typing import Union from absl import logging @@ -305,8 +305,8 @@ def grid_get_neighbor_ids(grid_id: str) -> list: if lat_offset != 0 or lng_offset != 0: neighbour_lat = lat + lat_offset * deg neighbour_lng = lng + lng_offset * deg - if abs(neighbour_lat) < _MAX_LATITUDE and abs( - neighbour_lng) < _MAX_LONGITUDE: + if (abs(neighbour_lat) < _MAX_LATITUDE and + abs(neighbour_lng) < _MAX_LONGITUDE): neighbours.append( grid_id_from_lat_lng( deg, @@ -433,7 +433,7 @@ def add_namespace(dcid: str, prefix: str = 'dcid:') -> str: def str_get_numeric_value( - value: Union[str, list, int, float]) -> Union[int, float, None]: + value: Union[str, list, int, float],) -> Union[int, float, None]: """Returns the numeric value from input string or None.""" if isinstance(value, list): value = value[0] @@ -528,7 +528,7 @@ def date_advance_by_period(date_str: str, if not date_str: return '' dt = datetime.strptime(date_str, date_format) - (delta, unit) = date_parse_time_period(time_period) + delta, unit = date_parse_time_period(time_period) if not delta or not unit: logging.error( f'Unable to parse time period: {time_period} for date: {date_str}') @@ -545,7 +545,7 @@ def date_format_by_time_period(date_str: str, time_period: str) -> str: """ if not time_period: return date_str - (delta, unit) = date_parse_time_period(time_period) + delta, unit = date_parse_time_period(time_period) date_parts = date_str.split('-') if unit == 'years': return date_parts[0] diff --git a/scripts/earthengine/utils_test.py b/scripts/earthengine/utils_test.py index f0f8e567cd..e93be53fea 100644 --- a/scripts/earthengine/utils_test.py +++ b/scripts/earthengine/utils_test.py @@ -390,8 +390,8 @@ def test_place_id_to_lat_lng_dc_api(self): lat, lng = utils.place_id_to_lat_lng(placeid, dc_api_lookup=True) self.assertAlmostEqual(37.221614, lat) self.assertAlmostEqual(-121.68954, lng) - mock_get.assert_called_once_with( - [placeid], ['latitude', 'longitude'], { - 'dc_api_version': 'V2', - 'dc_api_use_cache': True, - }) + mock_get.assert_called_once_with([placeid], + ['latitude', 'longitude'], { + 'dc_api_version': 'V2', + 'dc_api_use_cache': True, + }) diff --git a/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers.py b/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers.py index 70e0936ee5..eb0e487f2a 100644 --- a/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers.py +++ b/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers.py @@ -62,8 +62,7 @@ def dc_check_existence(dcid_list: list, Dict object with dcids as key values and boolean values signifying existence as values. """ wrapper_config = { - 'dc_api_batch_size': - max_items, + 'dc_api_batch_size': max_items, } return dc_api_is_defined_dcid(dcid_list, wrapper_config) diff --git a/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers_test.py b/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers_test.py index e0374c7010..e605afb5a9 100644 --- a/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers_test.py +++ b/scripts/us_census/acs5yr/subject_tables/common/datacommons_api_wrappers/datacommons_wrappers_test.py @@ -34,10 +34,9 @@ def test_dc_check_existence_mock(self, mock_is_defined): # Test 1: Default (use_autopush=True by default in function signature) mock_is_defined.return_value = {'node1': True} dc_check_existence(['node1']) - mock_is_defined.assert_called_with( - ['node1'], { - 'dc_api_batch_size': 450, - }) + mock_is_defined.assert_called_with(['node1'], { + 'dc_api_batch_size': 450, + }) # Test 2: use_autopush=False dc_check_existence(['node2'], use_autopush=False, max_items=10) diff --git a/tools/statvar_importer/config_flags.py b/tools/statvar_importer/config_flags.py index 94a162f33c..a7449ca032 100644 --- a/tools/statvar_importer/config_flags.py +++ b/tools/statvar_importer/config_flags.py @@ -370,7 +370,7 @@ def get_default_config() -> dict: True, # Settings for DC API. 'dc_api_root': - os.environ.get('DC_API_ROOT', 'http://api.datacommons.org'), + os.environ.get('DC_API_ROOT', 'https://api.datacommons.org'), 'dc_api_use_cache': False, 'dc_api_batch_size': diff --git a/util/dc_api_wrapper.py b/util/dc_api_wrapper.py index 682d4aeeaf..1265452f8c 100644 --- a/util/dc_api_wrapper.py +++ b/util/dc_api_wrapper.py @@ -21,16 +21,21 @@ import os import sys -import urllib -import requests from typing import Union +import urllib from absl import logging from datacommons_client.client import DataCommonsClient from datacommons_client.utils.error_handling import APIError, DCConnectionError, DCStatusError +import requests import requests_cache -from tenacity import (RetryCallState, Retrying, retry_if_exception, - stop_after_attempt, wait_fixed) +from tenacity import ( + RetryCallState, + Retrying, + retry_if_exception, + stop_after_attempt, + wait_fixed, +) _SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(_SCRIPT_DIR) @@ -64,8 +69,14 @@ def _get_exception_status_code(exception): def _should_retry_exception(exception: Exception) -> bool: - if isinstance(exception, (DCConnectionError, requests.exceptions.Timeout, - requests.exceptions.ChunkedEncodingError)): + if isinstance( + exception, + ( + DCConnectionError, + requests.exceptions.Timeout, + requests.exceptions.ChunkedEncodingError, + ), + ): return True if isinstance(exception, (urllib.error.HTTPError, DCStatusError, APIError)): status_code = _get_exception_status_code(exception) @@ -105,6 +116,7 @@ def dc_api_wrapper( retries: Maximum number of attempts (including the first attempt). retry_sec: Interval in seconds between retries for which caller is blocked. use_cache: If True, uses request cache for faster response. + Returns: The response from the DataCommons API call. """ @@ -147,8 +159,9 @@ def dc_api_wrapper( logging.error(f'Got exception for api: {function}, {e}') return None except Exception as e: - e.add_note(f'DC API call failed for {function} with max attempts ' - f'{max_attempts}.') + e.add_note( + f'DC API call failed for {function} with max attempts {max_attempts}.' + ) raise @@ -252,11 +265,13 @@ def get_dc_api_key(config: dict = None) -> str: api_key = config.get('dc_api_key', os.environ.get('DC_API_KEY')) if not api_key: logging.log_first_n( - logging.WARNING, f'Using default DC API key with limited quota. ' - 'Please set an API key in the environment variable: DC_API_KEY.' - 'Refer https://docs.datacommons.org/api/python/v2/#authentication ' - 'for more details.', - n=1) + logging.WARNING, + f'Using default DC API key with limited quota. ' + f'Please set an API key in the environment variable: DC_API_KEY.' + f'Refer https://docs.datacommons.org/api/python/v2/#authentication ' + f'for more details.', + n=1, + ) api_key = _DEFAULT_DC_API_KEY return api_key @@ -286,6 +301,7 @@ def get_datacommons_client(config: dict = None) -> DataCommonsClient: def dc_api_is_defined_dcid(dcids: list, config: dict = {}) -> dict: """Returns a dictionary with dcids mapped to True/False based on whether + the dcid is defined in the API and has a 'typeOf' property. Uses the property_value() DC API to lookup 'typeOf' for each dcid. dcids not defined in KG get a value of False. @@ -301,11 +317,13 @@ def dc_api_is_defined_dcid(dcids: list, config: dict = {}) -> dict: # Set parameters for node API. client = get_datacommons_client(config) api_function = client.node.fetch_property_values - api_result = dc_api_batched_wrapper(function=api_function, - dcids=dcids, - args={'properties': 'typeOf'}, - dcid_arg_kw='node_dcids', - config=config) + api_result = dc_api_batched_wrapper( + function=api_function, + dcids=dcids, + args={'properties': 'typeOf'}, + dcid_arg_kw='node_dcids', + config=config, + ) response = {} for dcid in dcids: dcid_stripped = _strip_namespace(dcid) @@ -348,11 +366,13 @@ def _dc_api_get_node_property_v2(dcids: list, api_function = client.node.fetch_property_values args = {'properties': prop} dcid_arg_kw = 'node_dcids' - api_result = dc_api_batched_wrapper(function=api_function, - dcids=dcids, - args=args, - dcid_arg_kw=dcid_arg_kw, - config=config) + api_result = dc_api_batched_wrapper( + function=api_function, + dcids=dcids, + args=args, + dcid_arg_kw=dcid_arg_kw, + config=config, + ) response = {} for dcid in dcids: dcid_stripped = _strip_namespace(dcid) @@ -398,11 +418,13 @@ def dc_api_get_node_property_values(dcids: list, config: dict = {}) -> dict: api_function = client.node.fetch args = {'expression': '->*'} dcid_arg_kw = 'node_dcids' - api_result = dc_api_batched_wrapper(function=api_function, - dcids=dcids, - args=args, - dcid_arg_kw=dcid_arg_kw, - config=config) + api_result = dc_api_batched_wrapper( + function=api_function, + dcids=dcids, + args=args, + dcid_arg_kw=dcid_arg_kw, + config=config, + ) response = {} for dcid, arcs in api_result.items(): pvs = {} @@ -446,11 +468,13 @@ def dc_api_resolve_placeid(dcids: list, api_function = client.resolve.fetch args = {'expression': f'<-{in_prop}->dcid'} dcid_arg_kw = 'node_ids' - api_result = dc_api_batched_wrapper(function=api_function, - dcids=dcids, - args=args, - dcid_arg_kw=dcid_arg_kw, - config=config) + api_result = dc_api_batched_wrapper( + function=api_function, + dcids=dcids, + args=args, + dcid_arg_kw=dcid_arg_kw, + config=config, + ) results = {} if api_result: for node in api_result.get('entities', []): @@ -478,7 +502,7 @@ def dc_api_resolve_latlng(lat_lngs: list, } if return_v1_response is True, a v1 response of this form is returned: - + { "placeCoordinates": [ { @@ -552,8 +576,7 @@ def dc_api_resolve_latlng(lat_lngs: list, def _convert_v2_to_v1_coordinate_response(v2_response: dict) -> dict: - """Converts a v2 coordinate resolution response to a v1 response. - """ + """Converts a v2 coordinate resolution response to a v1 response.""" v1_response = {'placeCoordinates': []} for entity in v2_response.get('entities', []): node = entity.get('node', '') @@ -573,15 +596,14 @@ def _convert_v2_to_v1_coordinate_response(v2_response: dict) -> dict: candidate.get('dcid') for candidate in entity.get('candidates', []) ], - 'places': entity.get('candidates', []) + 'places': entity.get('candidates', []), } v1_response['placeCoordinates'].append(place_coordinate) return v1_response def _convert_v1_to_v2_coordinate_request(v1_request: dict) -> dict: - """Converts a v1 coordinate resolution request to a v2 request. - """ + """Converts a v1 coordinate resolution request to a v2 request.""" v2_request = {'nodes': [], 'property': '<-geoCoordinate->dcid'} for coordinate in v1_request.get('coordinates', []): lat = coordinate.get('latitude') From 41a0a611f9f3e512617771d5bb2f8ee8cba1e9f7 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Mon, 25 May 2026 16:06:59 +0530 Subject: [PATCH 5/5] scripts for baseline metrics --- .../metrics/pvmap_generator_metrics.py | 245 ++++++++ tools/statvar_importer/mcf_diff.py | 18 +- tools/statvar_importer/stat_var_processor.py | 11 +- .../statvar_importer/stat_var_test_runner.py | 524 ++++++++++++++++++ 4 files changed, 793 insertions(+), 5 deletions(-) create mode 100644 tools/agentic_import/metrics/pvmap_generator_metrics.py create mode 100644 tools/statvar_importer/stat_var_test_runner.py diff --git a/tools/agentic_import/metrics/pvmap_generator_metrics.py b/tools/agentic_import/metrics/pvmap_generator_metrics.py new file mode 100644 index 0000000000..73ceeabf5a --- /dev/null +++ b/tools/agentic_import/metrics/pvmap_generator_metrics.py @@ -0,0 +1,245 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Utilities to generate metrics for PV map generation for statvar imports. +""" + +import os +import sys +import subprocess +import tempfile +import time + +from absl import app +from absl import flags +from absl import logging + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(_SCRIPT_DIR) +sys.path.append(os.path.dirname(_SCRIPT_DIR)) +sys.path.append(os.path.dirname(os.path.dirname(_SCRIPT_DIR))) +sys.path.append( + os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(_SCRIPT_DIR)), 'util'))) +sys.path.append( + os.path.join(os.path.dirname(os.path.dirname(_SCRIPT_DIR)), + 'statvar_importer')) + +import file_util +from counters import Counters +from mcf_diff import diff_mcf_files +from mcf_file_util import get_value_list +from stat_var_test_runner import StatVarProcessorTestRunner, run_script + + +class PVMapGeneratorMetricsRunner(StatVarProcessorTestRunner): + """Class to generate metrics for PV map generation for statvar imports. + + The metrics are generated by running the statvar processor with the test + config and comparing the output with the expected output. + The files for pvmap egenration are specified in the test config as follows: + "test_name": "", + ... + "pvmap_generator_test": { + "pvmap_generator_args": { + "input_data": "", + "output_data": "", + }, + "expected_output_files": [ + "", + "", + ... + ], + } + + """ + + def __init__(self, test_config_file: str = None, test_config: dict = None): + super().__init__(test_config_file, test_config) + + def run_pvmap_generator(self, test_config: dict) -> dict: + """Runs a single instance of pvmap generator.""" + test_name = test_config.get('test_name', 'pvmap_generator_test') + test_dir = os.path.join(self._temp_dir.name, test_name) + output_dir = os.path.join(test_dir, 'generated') + logging.info( + f'Running pvmap generator for test {test_name}, output in {output_dir}, config: {test_config}' + ) + env_dict = dict(self.get_env_dict()) + env_dict.update(get_env_dict(test_config.get("env_file"))) + # Build the pvmap generator commandline + cmd_args = dict() + # Add default arguments for output + + cmd_args = merged_args([ + { + '--output_path': output_dir + }, + test_config.get('pvmap_generator_args', {}), + ]) + output_arg = get_arg(cmd_args, '--output_path') + if output_arg: + output_dir = os.path.realpath(output_arg) + if not output_dir.endswith('/'): + output_dir = os.path.dirname(output_dir) + + cwd = self.get_local_dir() + script_status = run_script( + interpreter=sys.executable, + script=os.path.join(os.path.dirname(_SCRIPT_DIR), + 'pvmap_generator.py'), + args=cmd_args, + cwd=cwd, + env=env_dict, + output_dir=output_dir, + log_dir=os.path.join(output_dir, 'debug'), + ) + + # Check for generated files + generated_config = { + 'pv_map': os.path.join(output_dir, 'generated_pvmap.csv'), + 'config_file': os.path.join(output_dir, 'generated_metadata.csv'), + } + script_status['generated_config'] = generated_config + for arg, file in generated_config.items(): + if not os.path.exists(file): + logging.error( + f'Failed to generate {file} for test: {test_name}') + script_status['status'] = 'FAIL' + + return script_status + + def run_sample_statvar_processor(self, test_config: dict, + generated_config: dict) -> dict: + """Run statvar processor on the sample input using the generated configs.""" + test_name = test_config.get('test_name', 'sample_statvar_processor') + test_dir = os.path.join(self._temp_dir.name, test_name) + output_dir = os.path.join(test_dir, 'sample') + env_dict = dict(self.get_env_dict()) + env_dict.update(get_env_dict(test_config.get("env_file"))) + + # Get commandline args for stavtar processor + # to use generated pvmap files. + cmd_args = merged_args([ + generated_config, + { + '--output_path': + output_dir, + '--output_data_pvs': + os.path.join(output_dir, 'output_data_pvs.csv'), + }, + ]) + script_status = self.run_statvar_processor(test_config, cmd_args) + + return script_status + + def generate_metrics(self, test_config: dict, test_status: dict) -> dict: + """Generates metrics for a test config. + + Args: + test_config: Dictionary with the test config. + + Returns: + Dictionary with the metrics. + """ + test_name = test_config.get('test_name', 'sample_statvar_processor') + test_dir = os.path.join(self._temp_dir.name, test_name) + outputs = test_config.get('pvmap_generator_outputs') + output_stats = self.verify_outputs(outputs, test_config, + test_status.get('output_dir')) + stats = {} + for output_name, stats in output_stats: + output_stats = output.get('counters', {}) + output_stats = self.get_stats_from_diff_counters(output_stats) + file_util.file_write_csv_dict( + output_stats, + os.path.join(test_dir, output_name + '-diff-counters.csv')) + stats[output_name] = output_stats + logging.info(f'Diff stats for {output_name}: {output_stats}') + + return stats + + def get_stats_from_diff_counters(self, + diff_stats: dict, + stats: dict = None) -> dict: + """Returns precision and recall stats from diff counters. + + Args: + diff_stats: diff counters for diff between actual and expected output files. + stats: output stats dictionary into which stats are added. + + """ + if stats is None: + stats = dict({ + 'false_positive': 0, + 'false_negative': 0, + 'true_positive': 0, + }) + counters = diff_stats.get('counters', {}) + matched = counters.get('pvs-matched', 0) + false_negative = counters.get('pvs-added', 0) + false_positive = counters.get('pvs-deleted', 0) + # Treat modfied as deleted+added + modified = counters.get('pvs-modified', 0) + false_negative += modified + false_positive += modified + + stats['true_positive'] += matched + stats['false_positive'] += false_positive + stats['false_negative'] += false_negative + + # Compute precision and recall + true_positive = stats['true_positive'] + precision = true_positive / (max( + true_positive + stats['false_positive'], 1)) + recall = true_positive / (max(true_positive + stats['false_negative'], + 1)) + + stats['precision'] = precision + stats['recall'] = recall + stats['f1'] = 2 * (precision * recall) / max(precision + recall, 1) + + return stats + + def run_test(self, test_output: str = None) -> dict: + """Runs the pvmap generator and returns the comparison metrics. + + Args: + test_output: JSON file with the test output status + + Returns: + JSON dict with the test summary + """ + cwd = self.get_local_dir() + os.chdir(cwd) + return_status = {'status': 'PASS'} + for index, test_config in enumerate(self._test_config): + test_status = {} + test_name = test_config.get('test_name') + if not test_name: + test_name = f'pvmap-generator-test-{index}' + basedir = os.path.basename(self.get_local_dir()) + if basedir: + test_name += '-' + basedir + test_config['test_name'] = test_name + logging.info(f'Running pvmap generator test: {index}:{test_name}') + pvmap_generator_status = self.run_pvmap_generator(test_config) + test_status['pvmap_generator_status'] = pvmap_generator_status + + logging.info( + f'Running statvar processor for test: {index}:{test_name}') + statvar_status = self.run_sample_statvar_processor( + test_config, pvmap_generator_status.get('generated_config', {})) + + test_status['statvar_processor_status'] = statvar_status + diff --git a/tools/statvar_importer/mcf_diff.py b/tools/statvar_importer/mcf_diff.py index e7eaa825f8..eae35b90cd 100644 --- a/tools/statvar_importer/mcf_diff.py +++ b/tools/statvar_importer/mcf_diff.py @@ -190,10 +190,11 @@ def diff_mcf_node_pvs(node_1: dict, pvs_deleted = set() pvs_added = set() pvs_modified = set() + pvs_matched = set() for d in diff: diff_str.append(d) - if d[0] == ' ': - counters and counters.add_counter(f'PVs-matched', 1) + if d[0] == ' ' and len(d) > 1: + counters and counters.add_counter(f'pvs-matched', 1) else: prop = '' value = '' @@ -204,10 +205,16 @@ def diff_mcf_node_pvs(node_1: dict, if prop: pvs_deleted.add(prop) has_diff = True - if d[0] == '+': + elif d[0] == '+': if prop: pvs_added.add(prop) has_diff = True + elif d[0] == '?': + # Ignore modifications that already show as delete/add + has_diff = True + else: + if prop: + pvs_matched.add(prop) if has_diff: pvs_modified = pvs_deleted.intersection(pvs_added) pvs_added = pvs_added.difference(pvs_modified) @@ -217,6 +224,7 @@ def diff_mcf_node_pvs(node_1: dict, (pvs_modified, 'modified'), (pvs_added, 'added'), (pvs_deleted, 'deleted'), + (pvs_matched, 'matched'), ]: for prop in props: counters.add_counter(f'pvs-{diff_type}', 1) @@ -230,7 +238,9 @@ def diff_mcf_node_pvs(node_1: dict, else: counters and counters.add_counter(f'nodes-missing-in-mcf1', 1) else: - counters and counters.add_counter(f'nodes-matched', 1) + if counters: + counters.add_counter(f'nodes-matched', 1) + counters.add_counter(f'pvs-matched', len(pvs_matched)) return has_diff, '\n'.join(diff_str), pvs_added, pvs_deleted, pvs_modified diff --git a/tools/statvar_importer/stat_var_processor.py b/tools/statvar_importer/stat_var_processor.py index 487cce556f..d0184e022c 100644 --- a/tools/statvar_importer/stat_var_processor.py +++ b/tools/statvar_importer/stat_var_processor.py @@ -83,7 +83,7 @@ from mcf_filter import drop_existing_mcf_nodes from mcf_diff import fingerprint_node, fingerprint_mcf_nodes, diff_mcf_node_pvs from place_resolver import PlaceResolver -from property_value_mapper import PropertyValueMapper +from property_value_mapper import PropertyValueMapper, write_pv_map from schema_resolver import SchemaResolver from json_to_csv import file_json_to_csv from schema_generator import generate_schema_nodes, generate_statvar_name @@ -1429,6 +1429,8 @@ def __init__( self._config.get('numeric_data_key', 'Number'), self._config.get('pv_lookup_key', 'Key'), ] + # Save per input cell PVs if required. + self._data_pvs = {} if self._config.get('output_data_pvs') else None def generate_pvmap(self): """Generate a PV Map from the input data.""" @@ -2272,6 +2274,8 @@ def process_row(self, row: list, row_index: int): row_index, col_index + 1) and self.process_stat_var_obs_pvs( merged_col_pvs, row_index, col_index): row_svobs += 1 + if self._data_pvs is not None: + self._data_pvs[self._file_context] = merged_col_pvs self.process_row_header_pvs(row, row_index, row_col_pvs, row_svobs, cols_with_pvs) # If row has no SVObs but has PVs, it must be a header. @@ -2818,6 +2822,11 @@ def write_outputs(self, output_path: str): columns=self._config.get('output_columns', []), output_tmcf_file=output_tmcf_file, ) + output_data_pvs = self._config.get('output_data_pvs') + if output_data_pvs and self._data_pvs: + write_pv_map(self._data_pvs, output_data_pvs) + self._counters.add_counter('output-data-pvs', len(self._data_pvs)) + self._counters.print_counters() counters_filename = self._config.get('output_counters', output_path + '_counters.txt') diff --git a/tools/statvar_importer/stat_var_test_runner.py b/tools/statvar_importer/stat_var_test_runner.py new file mode 100644 index 0000000000..ed9a6fe99b --- /dev/null +++ b/tools/statvar_importer/stat_var_test_runner.py @@ -0,0 +1,524 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Utilities to test statvar imports. + +StatVar imports use a JSON test config for regression tests that invoke +statvar processor with a specified set of configs on a test input and +verifies the expected output files. +The test config is a list of statvar processor command line invocation +settings with each invocation as a dictionary with the following sections:a + test_name: name of the test for logs + env_file: a file with environment variables such as API keys + statvar_processor_args: dictionary with the command line arguments + for statvar processor configs. + expected_outputs: + A dictionary of expected output files mapped to output files. + + All file references are releative to the import folder with the + test config. + +Example: +[ + # Parameters for a single invocation of statvar processor + { + "test_name": "Test_Import", + + # Enviroment file loaded with dictionary of env settings + # such as API keys + "env_file": "gs://datcom-prod-imports/config/test_env.csv" + + # Statvar processor command line arguments + "statvar_processor_args": { + "config_file": "metadata.csv", + "pv_map": "import_pvmap.csv", + "places_resolved_csv": "places.csv", + "input_data": "test_data/sample_input.csv", + }, + + # Statvar processor output files with expected outputs to compare with + "statvar_processor_outputs": [ + { + "output_name": "Test_Import_Observations", + "output_file": "", + "expected_output_file": "test_data/sample_output.csv", + }, + ] + } +] + +""" + +import os +import sys +import subprocess +import tempfile +import time + +from absl import app +from absl import flags +from absl import logging + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(_SCRIPT_DIR) +sys.path.append(os.path.dirname(_SCRIPT_DIR)) +sys.path.append(os.path.dirname(os.path.dirname(_SCRIPT_DIR))) +sys.path.append( + os.path.join(os.path.dirname(os.path.dirname(_SCRIPT_DIR)), 'util')) + +import file_util +from mcf_diff import diff_mcf_files +from mcf_file_util import get_value_list + +from counters import Counters + +flags.DEFINE_string('test_config', '', 'JSON test config file') +flags.DEFINE_string('test_output', '', 'Output JSON file with test summary') + +_FLAGS = flags.FLAGS + +_BASH_PATH = os.environ.get('BASH_PATH', 'bash') + + +class StatVarProcessorTestRunner: + """Class to run statvar processor for a test config + """ + + def __init__(self, test_config_file: str = None, test_config: dict = None): + self._temp_dir = tempfile.TemporaryDirectory() + self._test_config = [] + if test_config: + self._test_config.extend(test_config) + self._env_dict = {} + self.load_config(test_config_file) + self._counters = Counters() + + def __del__(self): + """Cleanup local files.""" + self._temp_dir.cleanup() + + def load_config(self, test_config_file: str): + """Loads a test config from a JSON file.""" + if test_config_file: + file_config = file_util.file_load_py_dict(test_config_file) + logging.info(f'Loading test config: {file_config}') + self._test_config.extend(file_config) + self._test_config_file = os.path.realpath(test_config_file) + self._env_dict.update( + get_env_dict(self._test_config[0].get('env_file', ''))) + + def get_config(self) -> dict: + """Returns the test config.""" + return self._test_config + + def get_local_dir(self) -> str: + """Returns the local directory for the test.""" + if self._test_config_file: + return os.path.dirname(self._test_config_file) + return os.getcwd() + + def get_env_dict(self) -> dict: + """Returns the environment variables for the test.""" + return self._env_dict + + def run_tests(self, test_output: str = None) -> dict: + """Runs all the statvar processor test and verified each output. + + Args: + test_output: JSON file with the test output status + + Returns: + JSON dict with the test summary + """ + cwd = self.get_local_dir() + os.chdir(cwd) + logging.info( + f'Running statvar processor for {len(self._test_config)} tests from {cwd}' + ) + return_status = {'status': 'PASS'} + for index, test_config in enumerate(self._test_config): + test_name = test_config.get('test_name') + if not test_name: + test_name = f'statvar-processor-test-{index}' + basedir = os.path.basename(self.get_local_dir()) + if basedir: + test_name += '-' + basedir + test_config['test_name'] = test_name + logging.info(f'Running statvar processor test: {index}:{test_name}') + test_status = self.run_statvar_processor(test_config) + if test_status.get('returncode', 1) != 0: + logging.error( + f'statvar processor test failed for {test_name}: {test_status}' + ) + return_status['status'] = 'FAIL' + else: + outputs = test_config.get('statvar_processor_outputs') + if outputs: + output_status = self.verify_outputs( + outputs, test_config, test_status.get('output_dir')) + if output_status.get('status', '') != 'PASS': + logging.error( + f'Failed to verify outputs for test: {index}:{test_name}:{output_status}' + ) + return_status['status'] = 'FAIL' + test_status['output_status'] = output_status + return_status[test_name] = test_status + logging.info( + f'statvar processor test: {index}:{test_name}: {return_status["status"]}' + ) + if test_output: + file_util.file_write_py_dict(return_status, test_output) + return return_status + + def run_statvar_processor(self, + config: dict, + override_args: dict = {}) -> dict: + """Runs a single instance of statvar processor.""" + test_name = config.get('test_name', 'statvar_processor_test') + test_dir = os.path.join(self._temp_dir.name, test_name) + output_dir = os.path.join(test_dir, 'output') + env_dict = dict(self.get_env_dict()) + env_dict.update(get_env_dict(config.get("env_file"))) + # Build the statvar processor commandline + cmd_args = merge_args([ + { + '--output_path': '"{output_dir}"', + '--output_counters': '"{output_dir}/statvar_counters.json"', + }, + config.get('statvar_processor_args', {}), + override_args, + ]) + output_arg = get_arg(cmd_args, '--output_path') + if output_arg: + output_dir = os.path.realpath(output_arg) + if not output_dir.endswith('/'): + output_dir = os.path.dirname(output_dir) + cwd = self.get_local_dir() + script_status = run_script( + interpreter=sys.executable, + script=os.path.join(_SCRIPT_DIR, 'stat_var_processor.py'), + args=cmd_args, + cwd=cwd, + env=env_dict, + output_dir=output_dir, + log_dir=os.path.join(output_dir, 'debug'), + ) + script_status['output_dir'] = output_dir + return script_status + + def verify_outputs(self, outputs: list[dict], config: dict, + output_path: dir) -> dict: + """Compare actual and expected outputs. + + Args: + outputs: list of expected and actual outputs to be compared. + each item in the list is a dictionary: + { + output_name: (optional) name of the output for debug and logs + output_file: actual output file to be compared. + this is a local file or path relative to output_path + expected_output_file: path to expected file to be compared with + diff_config: (optional) dictionary with diff configs such as: + ignore_property: 'name' + } + + Returns: + dictionary with the overall status as well as a list + of status per output. + { + 'status': 'PASS' if there are no diffs else 'FAIL' + + # diff summary per output + '': { + + 'diff_status': 'MATCHED' or 'DELETED' or 'MODIFIED' + 'deleted': count of deleted nodes + 'modified': count of modified nodes + }, + } + """ + logging.info(f'Comparing outputs: {outputs}') + return_status = {'status': 'PASS'} + for index, output in enumerate(outputs): + expected_files = file_util.file_get_matching( + output.get('expected_output_file')) + if not expected_files: + logging.warning( + f'Ignoring config without expected file: {output}') + continue + output_name = output.get('name') + if not output_name: + output_name = f'output_{index}_{os.path.basename(expected_files[0])}' + if output_name in return_status: + output_name = output_name + '-' + str(index) + actual_files = output.get('output_file') + actuals = [] + for file in get_value_list(actual_files): + matching_files = file_util.file_get_matching( + os.path.join(output_path, file)) + if not matching_files: + matching_files = file_util.file_get_matching(file) + if matching_files: + actuals.extend(matching_files) + if not actuals: + logging.error(f'Missing outputs for {output}') + return_status['status'] = 'ERROR' + diff_status = diff_files( + actuals, expected_files, output.get('diff_config', {}), + os.path.join(output_path, f'{output_name}.diff')) + if diff_status.get('diff_status') != 'MATCHED': + return_status['status'] = 'FAIL' + logging.error( + f'Failed to match {index}:{output}, diff: {diff_status}') + return_status[output_name] = diff_status + return_status[output_name]['name'] = output_name + logging.info( + f'Verify outputs: {return_status.get("status")} for {outputs}') + return return_status + + +def merge_args(self, args: list[dict]) -> dict: + """Returns a dictionary of command line args from a list of args. + + Args: + args: list of dictionary of command line argument and valies: + [ + { 'arg1': 'value1', ...}, + { ...}, + ... + } + + Returns: + Dictionary of command line args with the value from the last arg. + """ + + if not isinstance(args, list): + args = list(args) + + # Merge arguments keeping the last value + merged_args = dict() + for args_dict in args: + for arg, value in args_dict.items(): + if value is not None and not arg.startswith('--'): + arg = '--' + arg + merged_args[arg] = value + + return merged_args + + +def get_args_list(self, args: dict) -> list: + """Returns a list of command line args.""" + cmd_args = [] + if isinstance(args, dict): + for arg, val in args.items(): + if val: + cmd_args.append(f'{arg}={val}') + else: + cmd_args.append(f'{arg}') + elif isinstance(args, list): + cmd_args.extend(args) + else: + cmd_args.append(args) + return cmd_args + + +def get_arg(self, args_dict: dict, arg: str, default=None) -> str: + """Returns the value of a specific arg if present.""" + return args_dict.get(arg, default) + + +def get_env_dict(filename: str) -> dict: + """Returns a dictionary of env variable settings from a file. + + Args: + filename: file with the environemnt settings. + It can be a csv, json, txt or shell file. + + Returns: + dictionary of env variable to values mappings. + """ + env_dict = {} + if isinstance(filename, dict): + # arguent is a dictionary of env variables. Use it as is. + return filename + + env_files = file_util.file_get_matching(filename) + for env_file in env_files: + _, file_ext = os.path.splitext(env_file) + if file_ext == '.sh' or file_ext == ".txt" or file_ext == '.env': + # File has one variable per line + with file_util.FileIO(env_file) as fp: + for line in fp.readlines(): + line = line.strip() + if line.startswith('#'): + # Ignore commented lines + continue + if '=' in line: + env_var, value = line.split('=', 1) + env_var = env_var.strip().strip('"') + value = value.strip().strip('"') + env_dict[env_var] = value + else: + # Assume the file is a dictionary + env_dict.update(file_util.file_load_py_dict(env_file)) + + logging.info(f'Loaded {len(env_dict)} env vars from {env_files}') + return env_dict + + +def run_script(interpreter: str, script: str, args: list | dict, cwd: str, + env: dict, output_dir: str, log_dir: str) -> dict: + """Run a commandline script as a child process + + Args: + interpreter: interpreter for the script such as python, bash + script: local or full path to the script file. + args: command line arguments for the script as a list + cwd: current directory for the script + env: dictionary of environment variables. + output_dir: fodler for output files generated by the script, if any. + log_dir: directory to store logs. + + Returns: + dictionary of command and status + """ + cmd_args = [] + if interpreter: + cmd_args.append(interpreter) + else: + if script and script.endswith('.py'): + # For python scripts use the current script's python binary + interpreter = sys.executable + if script and script.endswith('.sh'): + # For shell scripts use bash + interpreter = _BASH_PATH + if script: + cmd_args.append(script) + if args: + cmd_args.extend(get_args_list(args)) + + logging.info(f'Running command: {cmd_args} in {cwd}, env: {env}') + env_dict = dict(os.environ) + if env: + env_dict.update(env) + start_time = time.perf_counter() + process = subprocess.Popen( + cmd_args, + cwd=cwd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env_dict, + ) + + # Log output continuously until the command completes. + stderr_file = os.path.join(log_dir, 'stderr.log') + stdout_file = os.path.join(log_dir, 'stdout.log') + with file_util.FileIO(stderr_file, 'wb') as f_err: + for line in process.stderr: + f_err.write(line) + logging.info(f'stderr: {line}') + with file_util.FileIO(stdout_file, 'wb') as f_out: + for line in process.stdout: + f_out.write(line) + logging.info(f'stdout: {line}') + + # Wait for process to complete + process.wait() + end_time = time.perf_counter() + return_code = process.returncode + duration = end_time - start_time + + logging.info( + f'Completed command: {cmd_args}, return code: {return_code}, time: {duration} secs, logs in: {log_dir}' + ) + + return { + 'command': ' '.join(cmd_args), + 'args': args, + 'cwd': cwd, + 'returncode': return_code, + 'output_path': output_dir, + 'debug': log_dir, + 'duration': duration, + 'stdout': stdout_file, + 'stderr': stderr_file, + } + + +def diff_files(actual: str, + expected: str, + diff_config: dict = {}, + diff_output_file: str = None) -> dict: + """Compares actual and expected files and returns dictionary with results. + + Args: + actual: list of actual files as csv or mcf. + expected: list of expected output files as csv or mcf + diff_config: dictionary of diff settings (refer to mcf_diff:diff_mcf_files) + compare_dcids: list of dcids to be compared + compare_nodes_with_pv: only compare nodes that have a listed propeorty:value + ignore_nodes_with_pv: ignore nodes with any of the property:value listed + compare_property: only compare listed propeorties in a node + + Returns: + { + status: MATCH or DELETED or MODIFIED + diff_log: log file with all the text style diffs with +|- prefixes. + missing: count of expected nodes missing in actual + modified: count of nodes expected nodes with modified pvs in actual + sample: a sample of 100 lines of diff output + } + """ + + counters = Counters() + diff_str = diff_mcf_files(actual, expected, diff_config, counters) + matched = counters.get_counter('nodes-matched') + deleted = counters.get_counter('dcid-missing-in-nodes1') + added = counters.get_counter('dcid-missing-in-nodes2') + modified = counters.get_counter('nodes-with-diff') + status = 'MATCHED' + if modified: + status = 'MODIFIED' + if deleted: + status = 'DELETED' + return_status = { + 'actual': actual, + 'expected': expected, + 'diff_status': status, + 'missing': deleted, + 'modified': modified, + 'added': added, + 'matched': matched, + 'counters': counters.get_counters(), + } + logging.info(f'Diff summary: {return_status}') + + if diff_str: + if diff_output_file: + with file_util.FileIO(diff_output_file, 'w') as diff_file: + diff_file.write(diff_str) + return_status['diff_output'] = diff_output_file + return_status['diff_sample'] = diff_str[:1000] + + return return_status + + +def main(_): + statvar_processor_tester = StatVarProcessorTestRunner( + test_config_file=_FLAGS.test_config) + test_result = statvar_processor_tester.run_tests(_FLAGS.test_output) + logging.info(f'Test result: {test_result}') + + +if __name__ == '__main__': + app.run(main)