diff --git a/create_trace_mapping.py b/create_trace_mapping.py index 900106b..2b8ef57 100644 --- a/create_trace_mapping.py +++ b/create_trace_mapping.py @@ -1,6 +1,4 @@ import yaml -from nemosis import static_table - from generator_to_trace_draft_mapper import ( draft_solar_generator_to_trace_mapping, draft_solar_rez_mapping, @@ -9,6 +7,7 @@ get_all_generators, gets_rezs, ) +from nemosis import static_table workbook = "D:/isp_2024_data/2024-isp-inputs-and-assumptions-workbook.xlsx" all_generators = get_all_generators(workbook) diff --git a/generator_to_trace_draft_mapper.py b/generator_to_trace_draft_mapper.py deleted file mode 100644 index 54a6256..0000000 --- a/generator_to_trace_draft_mapper.py +++ /dev/null @@ -1,144 +0,0 @@ -import os - -import pandas as pd -from fuzzywuzzy import fuzz, process -from isp_workbook_parser import Parser, TableConfig - -from isp_trace_parser.metadata_extractors import ( - extract_solar_trace_metadata, - extract_wind_trace_metadata, -) - - -def get_all_generators(workbook_filepath): - workbook = Parser(workbook_filepath) - existing_gens = workbook.get_table("existing_generator_summary") - existing_gens["Status"] = "existing" - committed_gens = workbook.get_table("committed_generator_summary") - committed_gens["Status"] = "committed" - anticipated_gens = workbook.get_table("anticipated_projects_summary") - anticipated_gens["Status"] = "anticipated" - additional_gens = workbook.get_table("additional_projects_summary") - additional_gens["Status"] = "additional" - - existing_gens = existing_gens.rename( - columns={existing_gens.columns.values[0]: "Generator"} - ) - committed_gens = committed_gens.rename( - columns={committed_gens.columns.values[0]: "Generator"} - ) - anticipated_gens = anticipated_gens.rename( - columns={anticipated_gens.columns.values[0]: "Generator"} - ) - additional_gens = additional_gens.rename( - columns={additional_gens.columns.values[0]: "Generator"} - ) - - all_gens = pd.concat( - [existing_gens, committed_gens, anticipated_gens, additional_gens] - ) - - all_gens = all_gens.loc[:, ["Generator", "Technology type"]] - - return all_gens - - -def gets_rezs(workbook_filepath): - table_config = TableConfig( - name="rezs", - sheet_name="Renewable Energy Zones", - header_rows=7, - end_row=50, - column_range="B:G", - ) - workbook = Parser(workbook_filepath) - rezs = workbook.get_table_from_config(table_config) - rezs = rezs.loc[:, ["Name"]] - return rezs - - -def find_best_match(plant_name, csv_files): - best_match = process.extractOne(plant_name, csv_files, scorer=fuzz.token_set_ratio) - best_match = best_match[0] if best_match else None - best_match = best_match - return best_match - - -def find_best_match_two_columns(row, csv_files): - match1 = process.extractOne(row["Generator"], csv_files) - best_match_plant_name = match1[0] if match1 else None - score_plant_name = match1[1] if match1 else None - - match2 = process.extractOne(row["DUID"], csv_files) - best_match_duid = match2[0] if match2 else None - score_duid = match2[1] if match2 else None - - if score_plant_name > score_duid: - best_match = best_match_plant_name - else: - best_match = best_match_duid - return best_match - - -def draft_solar_generator_to_trace_mapping(solar_generators, solar_trace_directory): - csv_file_names = [ - f for f in os.listdir(solar_trace_directory) if f.endswith(".csv") - ] - csv_file_metadata = [extract_solar_trace_metadata(f) for f in csv_file_names] - csv_project_names = [ - f["name"] for f in csv_file_metadata if f["file_type"] == "project" - ] - solar_generators["CSVFile"] = solar_generators["Generator"].apply( - lambda x: find_best_match(x, csv_project_names) - ) - solar_generators = solar_generators.set_index("Generator")["CSVFile"].to_dict() - return solar_generators - - -def draft_solar_rez_mapping(rezs, rezs_trace_directory): - csv_file_names = [f for f in os.listdir(rezs_trace_directory) if f.endswith(".csv")] - csv_file_metadata = [extract_solar_trace_metadata(f) for f in csv_file_names] - csv_rez_names = [f["name"] for f in csv_file_metadata if f["file_type"] == "area"] - rezs["CSVFile"] = rezs["Name"].apply(lambda x: find_best_match(x, csv_rez_names)) - rezs = rezs.set_index("Name")["CSVFile"].to_dict() - return rezs - - -def draft_wind_generator_to_trace_mapping( - wind_generators, wind_duids_and_station_names, wind_trace_directory -): - csv_file_names = [f for f in os.listdir(wind_trace_directory) if f.endswith(".csv")] - csv_file_metadata = [extract_wind_trace_metadata(f) for f in csv_file_names] - csv_project_names = [ - f["name"] for f in csv_file_metadata if f["file_type"] == "project" - ] - - wind_station_names = list(wind_duids_and_station_names["Station Name"]) - - wind_generators["Station Name"] = wind_generators["Generator"].apply( - lambda x: find_best_match(x, wind_station_names) - ) - wind_generators = pd.merge( - wind_generators, wind_duids_and_station_names, how="left", on="Station Name" - ) - wind_generators = wind_generators.drop_duplicates(["Generator"]) - - wind_generators["CSVFile"] = wind_generators.apply( - lambda x: find_best_match_two_columns(x, csv_project_names), axis=1 - ) - - wind_generators = wind_generators.loc[ - :, ["Generator", "Station Name", "DUID", "CSVFile"] - ] - - wind_generators = wind_generators.set_index("Generator").to_dict(orient="index") - return wind_generators - - -def draft_wind_rez_mapping(rezs, rezs_trace_directory): - csv_file_names = [f for f in os.listdir(rezs_trace_directory) if f.endswith(".csv")] - csv_file_metadata = [extract_wind_trace_metadata(f) for f in csv_file_names] - csv_rez_names = [f["name"] for f in csv_file_metadata if f["file_type"] == "area"] - rezs["CSVFile"] = rezs["Name"].apply(lambda x: find_best_match(x, csv_rez_names)) - rezs = rezs.set_index("Name")["CSVFile"].to_dict() - return rezs diff --git a/src/isp_trace_parser/metadata_extractors.py b/src/isp_trace_parser/metadata_extractors.py index d579d6f..a99b450 100644 --- a/src/isp_trace_parser/metadata_extractors.py +++ b/src/isp_trace_parser/metadata_extractors.py @@ -1,65 +1,6 @@ import re -def extract_solar_trace_metadata(filename): - # Case 1: Match filenames that have a name, a tech, followed by RefYear - pattern1 = re.compile( - r"^(?P[A-Za-z0-9_\-]+)_(?P[A-Z]+)_RefYear(?P\d{4})\.csv$" - ) - - # Case 2: Match filenames that have a rez, a name, and tech, followed by RefYear - pattern2 = re.compile( - r"^[A-Z]+_(?P[A-Z0-9]+)_[A-Za-z0-9_\-]+_(?P[A-Z]+)_RefYear(?P\d{4})\.csv$" - ) - - # Try to match with pattern 2 first - match2 = pattern2.match(filename) - if match2: - match_data = match2.groupdict() - match_data["file_type"] = "zone" - match_data["reference_year"] = int(match_data["reference_year"]) - return match_data - - # Otherwise, try to match with pattern 1 (just name and year) - match1 = pattern1.match(filename) - if match1: - match_data = match1.groupdict() - match_data["file_type"] = "project" - match_data["reference_year"] = int(match_data["reference_year"]) - return match_data - - raise ValueError(f"Filename '{filename}' does not match the expected pattern") - - -def extract_wind_trace_metadata(filename): - # Case 1: Match filenames that have a simple name followed by RefYear - pattern1 = re.compile(r"^(?P.*)_RefYear(?P\d{4})\.csv$") - - # Case 2: Match filenames that have a resource type and a name followed by RefYear - pattern2 = re.compile( - r"^(?P[A-Z0-9]+)_(?PW[A-Z]+)_[A-Za-z_\-]+_RefYear(?P\d{4})\.csv$" - ) - - # Try to match with pattern 2 first - match2 = pattern2.match(filename) - if match2: - match_data = match2.groupdict() - match_data["file_type"] = "zone" - match_data["reference_year"] = int(match_data["reference_year"]) - return match_data - - # Otherwise, try to match with pattern 1 (just name and year) - match1 = pattern1.match(filename) - if match1: - match_data = match1.groupdict() - match_data["file_type"] = "project" - match_data["resource_type"] = "WIND" - match_data["reference_year"] = int(match_data["reference_year"]) - return match_data - - raise ValueError(f"Filename '{filename}' does not match the expected pattern") - - def extract_demand_trace_metadata(filename): # Regex pattern to match the structure of the filename pattern = re.compile( diff --git a/src/isp_trace_parser/resource_trace_metadata.py b/src/isp_trace_parser/resource_trace_metadata.py new file mode 100644 index 0000000..7516339 --- /dev/null +++ b/src/isp_trace_parser/resource_trace_metadata.py @@ -0,0 +1,44 @@ +from pathlib import Path + +from isp_trace_parser import mappings + +# Temporrary mapping that translates the YAML's resource_type vocabulary to the legacy +# short codes still used downstream for now (filters, parquet columns, output filenames). + +_RESOURCE_TYPE_CODES: dict[str, str] = { + "solar_sat": "SAT", + "solar_ffp": "FFP", + "solar_cst": "CST", + "wind": "WIND", + "wind_high": "WH", + "wind_medium": "WM", + "wind_offshore_fixed": "WFX", + "wind_offshore_floating": "WFL", +} + + +def build( + files: list[Path], + version: str, +) -> dict[Path, dict[str, str | int]]: + """Build metadata for resource files by lookup in the resource mapping. + + The mapping key is the trace stem (the filename with `_RefYear.csv` + stripped) so `_RefYear.csv` decomposes back to (stem, year). + """ + + resource_mapping = mappings.load("resources", version=version) + + file_metadata: dict[Path, dict[str, str]] = {} + for path in files: + stem, sep, ref = path.stem.rpartition("_RefYear") + if not sep or not ref.isdigit() or stem not in resource_mapping: + raise ValueError(f"Unexpected trace filename: {path.name}") + entry = resource_mapping[stem] + file_metadata[path] = { + "name": entry["location"], + "reference_year": int(ref), + "resource_type": _RESOURCE_TYPE_CODES[entry["resource_type"]], + "file_type": entry["location_type"], + } + return file_metadata diff --git a/src/isp_trace_parser/solar_traces.py b/src/isp_trace_parser/solar_traces.py index f52ee49..17fc881 100644 --- a/src/isp_trace_parser/solar_traces.py +++ b/src/isp_trace_parser/solar_traces.py @@ -6,8 +6,7 @@ from joblib import Parallel, delayed from pydantic import BaseModel, validate_call -from isp_trace_parser import input_validation, mappings -from isp_trace_parser.metadata_extractors import extract_solar_trace_metadata +from isp_trace_parser import input_validation, mappings, resource_trace_metadata from isp_trace_parser.trace_restructure_helper_functions import ( check_filter_by_metadata, get_all_filepaths, @@ -134,7 +133,7 @@ def parse_solar_traces( parsed_directory = input_validation.parsed_directory(parsed_directory) files = get_all_filepaths(input_directory) - file_metadata = extract_metadata_for_all_solar_files(files) + file_metadata = resource_trace_metadata.build(files, version="2024") resource_mapping = mappings.load("resources") project_name_mapping = { @@ -264,22 +263,6 @@ def write_output_solar_filename(metadata: dict[str, str]) -> str: return f"RefYear{m['reference_year']}_{name}_{m['resource_type']}.parquet" -def extract_metadata_for_all_solar_files( - filepaths: list[Path], -) -> dict[Path, dict[str, str]]: - """ - Extracts metadata for all solar trace files. - - Args: - filepaths: List of Path objects representing the solar trace files. - - Returns: - A dictionary with filepaths as keys and metadata dicts as values. - """ - file_metadata = [extract_solar_trace_metadata(str(f.name)) for f in filepaths] - return dict(zip(filepaths, file_metadata)) - - def get_unique_resource_types_in_metadata( metadata_for_trace_files: dict[Path, dict[str, str]], ) -> list[str]: diff --git a/src/isp_trace_parser/wind_traces.py b/src/isp_trace_parser/wind_traces.py index 68db9c8..775f6e5 100644 --- a/src/isp_trace_parser/wind_traces.py +++ b/src/isp_trace_parser/wind_traces.py @@ -6,8 +6,7 @@ from joblib import Parallel, delayed from pydantic import BaseModel, validate_call -from isp_trace_parser import input_validation, mappings -from isp_trace_parser.metadata_extractors import extract_wind_trace_metadata +from isp_trace_parser import input_validation, mappings, resource_trace_metadata from isp_trace_parser.trace_restructure_helper_functions import ( check_filter_by_metadata, filter_mapping_by_names_in_input_files, @@ -135,7 +134,7 @@ def parse_wind_traces( parsed_directory = input_validation.parsed_directory(parsed_directory) files = get_all_filepaths(input_directory) - file_metadata = extract_metadata_for_all_wind_files(files) + file_metadata = resource_trace_metadata.build(files, version="2024") resource_mapping = mappings.load("resources") zone_name_mappings = { @@ -320,16 +319,6 @@ def write_output_wind_zone_filename(metadata: dict) -> str: return f"RefYear{m['reference_year']}_{name}_{m['resource_type']}.parquet" -def extract_metadata_for_all_wind_files(filepaths: list) -> dict: - """ - Extracts metadata for all wind trace files. - - Returns a dict with filepaths as keys and metadata dicts as values. - """ - file_metadata = [extract_wind_trace_metadata(str(f.name)) for f in filepaths] - return dict(zip(filepaths, file_metadata)) - - def get_unique_resource_types_in_metadata( metadata_for_trace_files: dict[str:str], ) -> list: diff --git a/tests/test_resource_trace_metadata.py b/tests/test_resource_trace_metadata.py new file mode 100644 index 0000000..a209d04 --- /dev/null +++ b/tests/test_resource_trace_metadata.py @@ -0,0 +1,40 @@ +from pathlib import Path + +import pytest + +from isp_trace_parser import resource_trace_metadata + + +def test_build(): + """One test covers function logic compared with regex approach + + Solar zones / wind zones / extra reference years add no new code-path + coverage (they're different YAML rows, not different code) + """ + files = [ + Path("Adelaide_Desal_FFP_RefYear2011.csv"), + Path("BLUFF1_RefYear2011.csv"), + ] + metadata = resource_trace_metadata.build(files, version="2024") + + assert metadata[files[0]] == { + "name": "Adelaide_Desal", + "reference_year": 2011, + "resource_type": "FFP", + "file_type": "project", + } + assert metadata[files[1]]["resource_type"] == "WIND" + + +@pytest.mark.parametrize( + "filename", + [ + "Adelaide_Desal.csv", # missing _RefYear separator + "Adelaide_Desal_RefYear.csv", # missing year + "Adelaide_Desal_RefYear2011a.csv", # non-digit year + "Mystery_Plant_RefYear2011.csv", # stem not in mapping + ], +) +def test_build_rejects_unexpected_filename(filename): + with pytest.raises(ValueError, match="Unexpected trace filename"): + resource_trace_metadata.build([Path(filename)], version="2024") diff --git a/tests/test_trace_file_meta_data_extraction.py b/tests/test_trace_file_meta_data_extraction.py index 5e50c9b..49346cf 100644 --- a/tests/test_trace_file_meta_data_extraction.py +++ b/tests/test_trace_file_meta_data_extraction.py @@ -1,52 +1,6 @@ from isp_trace_parser import metadata_extractors -def test_solar_trace_metadata_extraction(): - file_name = "Woolooga_SAT_RefYear2023.csv" - metadata = metadata_extractors.extract_solar_trace_metadata(file_name) - assert metadata["name"] == "Woolooga" - assert metadata["resource_type"] == "SAT" - assert metadata["reference_year"] == 2023 - assert metadata["file_type"] == "project" - - file_name = "Darling_Downs_FFP_RefYear2023.csv" - metadata = metadata_extractors.extract_solar_trace_metadata(file_name) - assert metadata["name"] == "Darling_Downs" - assert metadata["resource_type"] == "FFP" - assert metadata["reference_year"] == 2023 - assert metadata["file_type"] == "project" - - file_name = "REZ_N0_NSW_Non-REZ_CST_RefYear2023.csv" - metadata = metadata_extractors.extract_solar_trace_metadata(file_name) - assert metadata["name"] == "N0" - assert metadata["resource_type"] == "CST" - assert metadata["reference_year"] == 2023 - assert metadata["file_type"] == "zone" - - -def test_wind_trace_metadata_extraction(): - file_name = "ARWF1_RefYear2023.csv" - metadata = metadata_extractors.extract_wind_trace_metadata(file_name) - assert metadata["name"] == "ARWF1" - assert metadata["resource_type"] == "WIND" - assert metadata["reference_year"] == 2023 - assert metadata["file_type"] == "project" - - file_name = "CAPTL_WF_RefYear2023.csv" - metadata = metadata_extractors.extract_wind_trace_metadata(file_name) - assert metadata["name"] == "CAPTL_WF" - assert metadata["resource_type"] == "WIND" - assert metadata["reference_year"] == 2023 - assert metadata["file_type"] == "project" - - file_name = "N8_WH_Cooma-Monaro_RefYear2023.csv" - metadata = metadata_extractors.extract_wind_trace_metadata(file_name) - assert metadata["name"] == "N8" - assert metadata["resource_type"] == "WH" - assert metadata["reference_year"] == 2023 - assert metadata["file_type"] == "zone" - - def test_demand_trace_metadata_extraction(): file_name = "VIC_RefYear_2011_STEP_CHANGE_POE10_OPSO_MODELLING.csv" metadata = metadata_extractors.extract_demand_trace_metadata(file_name)