diff --git a/src/isp_trace_parser/demand_traces.py b/src/isp_trace_parser/demand_traces.py index 9703fe6..ab17556 100644 --- a/src/isp_trace_parser/demand_traces.py +++ b/src/isp_trace_parser/demand_traces.py @@ -70,17 +70,16 @@ def parse_demand_traces( contains metadata in the following format "_RefYear____.csv". For example, "CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.csv". - The trace parser reformats the data and stores the data files in parquet format with metadata columns, - which match the IASR workbook conventions. + The trace parser reformats the data and stores the data files in parquet format with metadata columns. The data format is changed to include a column "datetime" specifying the end of the half hour period the measurement is for in the format %Y-%m-%d %H:%M:%S, a column "value" specifying the measurement value, and metadata columns (subregion, reference_year, scenario, poe, demand_type). The scenario - column contains the mapped scenario name from the IASR workbook. Files are saved with a new naming - convention: "_RefYear___.parquet". + column contains the mapped scenario name from the IASR workbook. Output files keep the AEMO input + filename, with the .csv suffix replaced by .parquet. For the CSV example above, the parsed filename would be: - "Green_Energy_Exports_RefYear2011_CNSW_POE10_OPSO_MODELLING.parquet" + "CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.parquet" By default, all trace data in the input directory is parsed. However, a DemandMetadataFilter can be provided to filter the traces based on metadata. If a metadata type is present in the filter then only traces with a @@ -130,6 +129,7 @@ def parse_demand_traces( """ input_directory = input_validation.input_directory(input_directory) parsed_directory = input_validation.parsed_directory(parsed_directory) + parsed_directory.mkdir(parents=True, exist_ok=True) files = get_all_filepaths(input_directory) file_metadata = demand_trace_metadata.build(files, version="2024") @@ -138,7 +138,6 @@ def parse_demand_traces( partial_func = functools.partial( restructure_demand_file, - all_input_file_metadata=file_metadata, demand_scenario_mapping=demand_scenario_mapping, output_directory=parsed_directory, filters=filters, @@ -147,30 +146,36 @@ def parse_demand_traces( if use_concurrency: max_workers = os.cpu_count() - 2 - Parallel(n_jobs=max_workers)(delayed(partial_func)(file) for file in files) + Parallel(n_jobs=max_workers)( + delayed(partial_func)(file, metadata) + for file, metadata in file_metadata.items() + ) else: - for file in files: - partial_func(file) + for file, metadata in file_metadata.items(): + partial_func(file, metadata) def restructure_demand_file( input_filepath: Path, - all_input_file_metadata: dict[Path, dict[str, str | int]], + file_metadata: dict[str, str | int], demand_scenario_mapping: dict[str, str], output_directory: Path, filters: DemandMetadataFilter | None = None, ) -> None: """ - Restructures a single demand trace file and saves it in a parquet format. + Restructures a single demand trace file and saves it as parquet. + + The output filename is the AEMO input filename with the .csv suffix replaced by + .parquet (e.g. CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.csv + becomes CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.parquet). - This function processes a demand trace file, restructures and saves it in a new format, with the original - input filename stem and a .parquet extension. It handles the mapping of scenario names and applies filters - if provided. + `file_metadata` is changed in place to hold the translated scenario (for including + and storing within the dataframe). Args: input_filepath: Path object representing the input demand trace file. - all_input_file_metadata: Metadata for all input files. + file_metadata: Metadata for this trace file (scenario name mutated in place). demand_scenario_mapping: Dictionary mapping raw scenario names to IASR workbook scenario names. output_directory: Directory where restructured files will be saved. filters: DemandMetadataFilter or None, specifies which traces to parse based on metadata. @@ -181,21 +186,26 @@ def restructure_demand_file( Example: >>> input_filepath = Path('CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.csv') + >>> file_metadata = { + ... 'subregion': 'CNSW', + ... 'scenario': 'HYDROGEN_EXPORT', + ... 'poe': 'POE10', + ... 'demand_type': 'OPSO_MODELLING', + ... 'reference_year': 2011, + ... } + >>> demand_scenario_mapping = {'HYDROGEN_EXPORT': 'Green Energy Exports'} >>> restructure_demand_file( ... input_filepath=input_filepath, + ... file_metadata=file_metadata, ... demand_scenario_mapping=demand_scenario_mapping, ... output_directory='/path/to/output' ... ) # doctest: +SKIP # This will process the input file and save it in parquet format in the specified output directory """ - file_metadata = dict(all_input_file_metadata[input_filepath]) - - file_metadata["scenario"] = get_save_scenario_for_demand_trace( - file_metadata, demand_scenario_mapping - ) + file_metadata["scenario"] = demand_scenario_mapping[file_metadata["scenario"]] parse_file = check_filter_by_metadata(file_metadata, filters) if parse_file: @@ -203,11 +213,7 @@ def restructure_demand_file( trace = trace_formatter(trace) trace = _frame_with_metadata(trace, file_metadata) - save_filepath = output_directory / write_new_demand_filename( - metadata=file_metadata - ) - save_filepath.parent.mkdir(parents=True, exist_ok=True) - + save_filepath = output_directory / f"{input_filepath.stem}.parquet" trace.write_parquet(save_filepath) @@ -225,36 +231,3 @@ def _frame_with_metadata(trace: pl.DataFrame, file_metadata: dict) -> pl.DataFra poe=pl.lit(file_metadata["poe"]), demand_type=pl.lit(file_metadata["demand_type"]), ) - - -def get_save_scenario_for_demand_trace( - file_metadata: dict[str, str], demand_scenario_mapping: dict[str, str] -) -> str: - """ - Maps the raw scenario name to the IASR workbook scenario name. - - Args: - file_metadata: Dictionary containing metadata for the demand trace file. - demand_scenario_mapping: Dictionary mapping raw scenario names to IASR workbook scenario names. - - Returns: - The mapped scenario name as a string. - """ - return demand_scenario_mapping[file_metadata["scenario"]] - - -def write_new_demand_filename(metadata: dict[str, str]) -> str: - """ - Generates the output filename for a demand trace file. - - Args: - metadata: Dictionary containing metadata for the demand trace file. - - Returns: - A string representing the filename. - """ - m = metadata - subregion = m["subregion"].replace(" ", "_") - scenario = m["scenario"].replace(" ", "_") - - return f"{scenario}_RefYear{m['reference_year']}_{subregion}_{m['poe']}_{m['demand_type']}.parquet" diff --git a/tests/test_data/output/Green_Energy_Exports_RefYear2011_CNSW_POE10_OPSO_MODELLING.parquet b/tests/test_data/output/CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.parquet similarity index 100% rename from tests/test_data/output/Green_Energy_Exports_RefYear2011_CNSW_POE10_OPSO_MODELLING.parquet rename to tests/test_data/output/CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.parquet diff --git a/tests/test_get_data.py b/tests/test_get_data.py index ddb1d0a..609226a 100644 --- a/tests/test_get_data.py +++ b/tests/test_get_data.py @@ -149,7 +149,7 @@ def test_get_demand_single_reference_year(parsed_trace_trace_directory: Path): test_df_lazy = pl.scan_parquet( TEST_DATA / "output" - / "Green_Energy_Exports_RefYear2011_CNSW_POE10_OPSO_MODELLING.parquet" + / "CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.parquet" ) start_dt, end_dt = _year_range_to_dt_range(2023, 2024, year_type="fy") @@ -182,7 +182,7 @@ def test_get_demand_multiple_reference_year(parsed_trace_trace_directory: Path): test_df_lazy = pl.scan_parquet( TEST_DATA / "output" - / "Green_Energy_Exports_RefYear2011_CNSW_POE10_OPSO_MODELLING.parquet" + / "CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.parquet" ) test_df = ( @@ -366,7 +366,7 @@ def test_demand_single_reference_year(parsed_trace_trace_directory: Path): test_df_lazy = pl.scan_parquet( TEST_DATA / "output" - / "Green_Energy_Exports_RefYear2011_CNSW_POE10_OPSO_MODELLING.parquet" + / "CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.parquet" ) start_dt, end_dt = _year_range_to_dt_range(2023, 2024, year_type="fy") @@ -399,7 +399,7 @@ def test_demand_multiple_reference_years(parsed_trace_trace_directory: Path): test_df_lazy = pl.scan_parquet( TEST_DATA / "output" - / "Green_Energy_Exports_RefYear2011_CNSW_POE10_OPSO_MODELLING.parquet" + / "CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.parquet" ) test_df = ( diff --git a/tests/test_trace_parsers.py b/tests/test_trace_parsers.py index c045908..ac72a08 100644 --- a/tests/test_trace_parsers.py +++ b/tests/test_trace_parsers.py @@ -14,9 +14,7 @@ def test_demand_trace_parsing(use_concurrency: bool): """Test demand trace parsing produces expected parquet output.""" test_demand_csv_directory = TEST_DATA / "demand" - expected_filename = ( - "Green_Energy_Exports_RefYear2011_CNSW_POE10_OPSO_MODELLING.parquet" - ) + expected_filename = "CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.parquet" test_demand_output_parquet = TEST_DATA / "output" / expected_filename with tempfile.TemporaryDirectory() as tmp_parsed_directory: diff --git a/tests/test_writing_save_names.py b/tests/test_writing_save_names.py index 6702326..3fc47a1 100644 --- a/tests/test_writing_save_names.py +++ b/tests/test_writing_save_names.py @@ -51,17 +51,3 @@ def test_write_wind_save_names(): ) assert str(save_filepath) == "RefYear1_a_x.parquet" - - -def test_write_demand_save_names(): - meta_data = { - "scenario": "a", - "reference_year": "1", - "subregion": "x", - "poe": "poe10", - "demand_type": "y", - } - - save_filepath = isp_trace_parser.demand_traces.write_new_demand_filename(meta_data) - - assert str(save_filepath) == "a_RefYear1_x_poe10_y.parquet"