Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions tools/agentic_import/metrics/pvmap_generator_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities to generate metrics for PV map generation for statvar imports.
"""

import os
import sys
import subprocess
import tempfile
import time

from absl import app
from absl import flags
from absl import logging

_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(_SCRIPT_DIR)
sys.path.append(os.path.dirname(_SCRIPT_DIR))
sys.path.append(os.path.dirname(os.path.dirname(_SCRIPT_DIR)))
sys.path.append(
os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(_SCRIPT_DIR)), 'util')))
Comment on lines +31 to +33
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current path calculation relies on a specific directory structure and contains a syntax error (os.path.dirname takes only one argument). Instead of hardcoding relative path traversal, use a more robust method to locate the repository root, such as searching for a marker file like 'WORKSPACE' or '.git'.

Suggested change
sys.path.append(
os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(_SCRIPT_DIR)), 'util')))
def find_root(path, marker='WORKSPACE'):
while path != os.path.dirname(path):
if os.path.exists(os.path.join(path, marker)):
return path
path = os.path.dirname(path)
return None
repo_root = find_root(os.path.abspath(_SCRIPT_DIR))
sys.path.append(os.path.join(repo_root, 'util'))
References
  1. Avoid hardcoding paths that rely on specific directory structures. Instead, use more robust methods to locate the repository root, such as searching for marker files (e.g., .git or WORKSPACE).

sys.path.append(
os.path.join(os.path.dirname(os.path.dirname(_SCRIPT_DIR)),
'statvar_importer'))

import file_util
from counters import Counters
from mcf_diff import diff_mcf_files
from mcf_file_util import get_value_list
from stat_var_test_runner import StatVarProcessorTestRunner, run_script


class PVMapGeneratorMetricsRunner(StatVarProcessorTestRunner):
"""Class to generate metrics for PV map generation for statvar imports.

The metrics are generated by running the statvar processor with the test
config and comparing the output with the expected output.
The files for pvmap egenration are specified in the test config as follows:
"test_name": "<test name>",
...
"pvmap_generator_test": {
"pvmap_generator_args": {
"input_data": "<path to input file>",
"output_data": "<path to output file>",
},
"expected_output_files": [
"<path to expected output file>",
"<path to expected output file>",
...
],
}

"""

def __init__(self, test_config_file: str = None, test_config: dict = None):
super().__init__(test_config_file, test_config)

def run_pvmap_generator(self, test_config: dict) -> dict:
"""Runs a single instance of pvmap generator."""
test_name = test_config.get('test_name', 'pvmap_generator_test')
test_dir = os.path.join(self._temp_dir.name, test_name)
output_dir = os.path.join(test_dir, 'generated')
logging.info(
f'Running pvmap generator for test {test_name}, output in {output_dir}, config: {test_config}'
)
env_dict = dict(self.get_env_dict())
env_dict.update(get_env_dict(test_config.get("env_file")))
# Build the pvmap generator commandline
cmd_args = dict()
# Add default arguments for output

cmd_args = merged_args([
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The function merged_args is not defined or imported. It appears you intended to use merge_args from stat_var_test_runner, but it is also not imported. This will result in a NameError at runtime.

{
'--output_path': output_dir
},
test_config.get('pvmap_generator_args', {}),
])
output_arg = get_arg(cmd_args, '--output_path')
if output_arg:
output_dir = os.path.realpath(output_arg)
if not output_dir.endswith('/'):
output_dir = os.path.dirname(output_dir)

cwd = self.get_local_dir()
script_status = run_script(
interpreter=sys.executable,
script=os.path.join(os.path.dirname(_SCRIPT_DIR),
'pvmap_generator.py'),
args=cmd_args,
cwd=cwd,
env=env_dict,
output_dir=output_dir,
log_dir=os.path.join(output_dir, 'debug'),
)

# Check for generated files
generated_config = {
'pv_map': os.path.join(output_dir, 'generated_pvmap.csv'),
'config_file': os.path.join(output_dir, 'generated_metadata.csv'),
}
script_status['generated_config'] = generated_config
for arg, file in generated_config.items():
if not os.path.exists(file):
logging.error(
f'Failed to generate {file} for test: {test_name}')
script_status['status'] = 'FAIL'

return script_status

def run_sample_statvar_processor(self, test_config: dict,
generated_config: dict) -> dict:
"""Run statvar processor on the sample input using the generated configs."""
test_name = test_config.get('test_name', 'sample_statvar_processor')
test_dir = os.path.join(self._temp_dir.name, test_name)
output_dir = os.path.join(test_dir, 'sample')
env_dict = dict(self.get_env_dict())
env_dict.update(get_env_dict(test_config.get("env_file")))

# Get commandline args for stavtar processor
# to use generated pvmap files.
cmd_args = merged_args([
generated_config,
{
'--output_path':
output_dir,
'--output_data_pvs':
os.path.join(output_dir, 'output_data_pvs.csv'),
},
])
script_status = self.run_statvar_processor(test_config, cmd_args)

return script_status

def generate_metrics(self, test_config: dict, test_status: dict) -> dict:
"""Generates metrics for a test config.

Args:
test_config: Dictionary with the test config.

Returns:
Dictionary with the metrics.
"""
test_name = test_config.get('test_name', 'sample_statvar_processor')
test_dir = os.path.join(self._temp_dir.name, test_name)
outputs = test_config.get('pvmap_generator_outputs')
output_stats = self.verify_outputs(outputs, test_config,
test_status.get('output_dir'))
stats = {}
for output_name, stats in output_stats:
output_stats = output.get('counters', {})
output_stats = self.get_stats_from_diff_counters(output_stats)
file_util.file_write_csv_dict(
output_stats,
os.path.join(test_dir, output_name + '-diff-counters.csv'))
stats[output_name] = output_stats
Comment on lines +161 to +167
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are multiple issues in this loop:

  1. output_stats is a dictionary; iterating over it directly yields keys, so unpacking into output_name, stats will fail with a ValueError.
  2. The variable output on line 162 is undefined.
  3. The loop variable stats shadows the stats = {} dictionary initialized on line 160.
  4. It is acceptable to assume the dictionary contains required keys like 'counters' and allow it to fail on a KeyError rather than using defensive defaults.
Suggested change
for output_name, stats in output_stats:
output_stats = output.get('counters', {})
output_stats = self.get_stats_from_diff_counters(output_stats)
file_util.file_write_csv_dict(
output_stats,
os.path.join(test_dir, output_name + '-diff-counters.csv'))
stats[output_name] = output_stats
for output_name, output_data in output_stats.items():
output_counters = output_data['counters']
diff_metrics = self.get_stats_from_diff_counters(output_counters)
file_util.file_write_csv_dict(
diff_metrics,
os.path.join(test_dir, output_name + '-diff-counters.csv'))
stats[output_name] = diff_metrics
References
  1. When a configuration or data dictionary is provided, it is acceptable to assume it contains all required keys and allow it to fail on a KeyError if a key is missing, rather than defensively using default values.

logging.info(f'Diff stats for {output_name}: {output_stats}')

return stats

def get_stats_from_diff_counters(self,
diff_stats: dict,
stats: dict = None) -> dict:
"""Returns precision and recall stats from diff counters.

Args:
diff_stats: diff counters for diff between actual and expected output files.
stats: output stats dictionary into which stats are added.

"""
if stats is None:
stats = dict({
'false_positive': 0,
'false_negative': 0,
'true_positive': 0,
})
counters = diff_stats.get('counters', {})
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F1 score from node diffs

matched = counters.get('pvs-matched', 0)
false_negative = counters.get('pvs-added', 0)
false_positive = counters.get('pvs-deleted', 0)
# Treat modfied as deleted+added
modified = counters.get('pvs-modified', 0)
false_negative += modified
false_positive += modified

stats['true_positive'] += matched
stats['false_positive'] += false_positive
stats['false_negative'] += false_negative

# Compute precision and recall
true_positive = stats['true_positive']
precision = true_positive / (max(
true_positive + stats['false_positive'], 1))
recall = true_positive / (max(true_positive + stats['false_negative'],
1))

stats['precision'] = precision
stats['recall'] = recall
stats['f1'] = 2 * (precision * recall) / max(precision + recall, 1)

return stats

def run_test(self, test_output: str = None) -> dict:
"""Runs the pvmap generator and returns the comparison metrics.

Args:
test_output: JSON file with the test output status

Returns:
JSON dict with the test summary
"""
cwd = self.get_local_dir()
os.chdir(cwd)
return_status = {'status': 'PASS'}
for index, test_config in enumerate(self._test_config):
test_status = {}
test_name = test_config.get('test_name')
if not test_name:
test_name = f'pvmap-generator-test-{index}'
basedir = os.path.basename(self.get_local_dir())
if basedir:
test_name += '-' + basedir
test_config['test_name'] = test_name
logging.info(f'Running pvmap generator test: {index}:{test_name}')
pvmap_generator_status = self.run_pvmap_generator(test_config)
test_status['pvmap_generator_status'] = pvmap_generator_status

logging.info(
f'Running statvar processor for test: {index}:{test_name}')
statvar_status = self.run_sample_statvar_processor(
test_config, pvmap_generator_status.get('generated_config', {}))

test_status['statvar_processor_status'] = statvar_status

18 changes: 14 additions & 4 deletions tools/statvar_importer/mcf_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,11 @@ def diff_mcf_node_pvs(node_1: dict,
pvs_deleted = set()
pvs_added = set()
pvs_modified = set()
pvs_matched = set()
for d in diff:
diff_str.append(d)
if d[0] == ' ':
counters and counters.add_counter(f'PVs-matched', 1)
if d[0] == ' ' and len(d) > 1:
counters and counters.add_counter(f'pvs-matched', 1)
else:
prop = ''
value = ''
Expand All @@ -204,10 +205,16 @@ def diff_mcf_node_pvs(node_1: dict,
if prop:
pvs_deleted.add(prop)
has_diff = True
if d[0] == '+':
elif d[0] == '+':
if prop:
pvs_added.add(prop)
has_diff = True
elif d[0] == '?':
# Ignore modifications that already show as delete/add
has_diff = True
else:
if prop:
pvs_matched.add(prop)
if has_diff:
pvs_modified = pvs_deleted.intersection(pvs_added)
pvs_added = pvs_added.difference(pvs_modified)
Expand All @@ -217,6 +224,7 @@ def diff_mcf_node_pvs(node_1: dict,
(pvs_modified, 'modified'),
(pvs_added, 'added'),
(pvs_deleted, 'deleted'),
(pvs_matched, 'matched'),
]:
for prop in props:
counters.add_counter(f'pvs-{diff_type}', 1)
Expand All @@ -230,7 +238,9 @@ def diff_mcf_node_pvs(node_1: dict,
else:
counters and counters.add_counter(f'nodes-missing-in-mcf1', 1)
else:
counters and counters.add_counter(f'nodes-matched', 1)
if counters:
counters.add_counter(f'nodes-matched', 1)
counters.add_counter(f'pvs-matched', len(pvs_matched))
return has_diff, '\n'.join(diff_str), pvs_added, pvs_deleted, pvs_modified


Expand Down
11 changes: 10 additions & 1 deletion tools/statvar_importer/stat_var_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
from mcf_filter import drop_existing_mcf_nodes
from mcf_diff import fingerprint_node, fingerprint_mcf_nodes, diff_mcf_node_pvs
from place_resolver import PlaceResolver
from property_value_mapper import PropertyValueMapper
from property_value_mapper import PropertyValueMapper, write_pv_map
from schema_resolver import SchemaResolver
from json_to_csv import file_json_to_csv
from schema_generator import generate_schema_nodes, generate_statvar_name
Expand Down Expand Up @@ -1429,6 +1429,8 @@ def __init__(
self._config.get('numeric_data_key', 'Number'),
self._config.get('pv_lookup_key', 'Key'),
]
# Save per input cell PVs if required.
self._data_pvs = {} if self._config.get('output_data_pvs') else None

def generate_pvmap(self):
"""Generate a PV Map from the input data."""
Expand Down Expand Up @@ -2272,6 +2274,8 @@ def process_row(self, row: list, row_index: int):
row_index, col_index + 1) and self.process_stat_var_obs_pvs(
merged_col_pvs, row_index, col_index):
row_svobs += 1
if self._data_pvs is not None:
self._data_pvs[self._file_context] = merged_col_pvs
self.process_row_header_pvs(row, row_index, row_col_pvs, row_svobs,
cols_with_pvs)
# If row has no SVObs but has PVs, it must be a header.
Expand Down Expand Up @@ -2818,6 +2822,11 @@ def write_outputs(self, output_path: str):
columns=self._config.get('output_columns', []),
output_tmcf_file=output_tmcf_file,
)
output_data_pvs = self._config.get('output_data_pvs')
if output_data_pvs and self._data_pvs:
write_pv_map(self._data_pvs, output_data_pvs)
self._counters.add_counter('output-data-pvs', len(self._data_pvs))

self._counters.print_counters()
counters_filename = self._config.get('output_counters',
output_path + '_counters.txt')
Expand Down
Loading
Loading