Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 31 additions & 58 deletions src/isp_trace_parser/demand_traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,16 @@ def parse_demand_traces(
contains metadata in the following format "<subregionID>_RefYear_<reference year>_<scenario>_<poe>_<data type>.csv".
For example, "CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.csv".

The trace parser reformats the data and stores the data files in parquet format with metadata columns,
which match the IASR workbook conventions.
The trace parser reformats the data and stores the data files in parquet format with metadata columns.
The data format is changed to include a column "datetime" specifying the end of the half hour period
the measurement is for in the format %Y-%m-%d %H:%M:%S, a column "value" specifying the measurement
value, and metadata columns (subregion, reference_year, scenario, poe, demand_type). The scenario
column contains the mapped scenario name from the IASR workbook. Files are saved with a new naming
convention: "<mapped_scenario>_RefYear<reference_year>_<subregion>_<poe>_<demand_type>.parquet".
column contains the mapped scenario name from the IASR workbook. Output files keep the AEMO input
filename, with the .csv suffix replaced by .parquet.

For the CSV example above, the parsed filename would be:

"Green_Energy_Exports_RefYear2011_CNSW_POE10_OPSO_MODELLING.parquet"
"CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.parquet"

By default, all trace data in the input directory is parsed. However, a DemandMetadataFilter can be provided
to filter the traces based on metadata. If a metadata type is present in the filter then only traces with a
Expand Down Expand Up @@ -130,6 +129,7 @@ def parse_demand_traces(
"""
input_directory = input_validation.input_directory(input_directory)
parsed_directory = input_validation.parsed_directory(parsed_directory)
parsed_directory.mkdir(parents=True, exist_ok=True)

files = get_all_filepaths(input_directory)
file_metadata = demand_trace_metadata.build(files, version="2024")
Expand All @@ -138,7 +138,6 @@ def parse_demand_traces(

partial_func = functools.partial(
restructure_demand_file,
all_input_file_metadata=file_metadata,
demand_scenario_mapping=demand_scenario_mapping,
output_directory=parsed_directory,
filters=filters,
Expand All @@ -147,30 +146,36 @@ def parse_demand_traces(
if use_concurrency:
max_workers = os.cpu_count() - 2

Parallel(n_jobs=max_workers)(delayed(partial_func)(file) for file in files)
Parallel(n_jobs=max_workers)(
delayed(partial_func)(file, metadata)
for file, metadata in file_metadata.items()
)

else:
for file in files:
partial_func(file)
for file, metadata in file_metadata.items():
partial_func(file, metadata)


def restructure_demand_file(
input_filepath: Path,
all_input_file_metadata: dict[Path, dict[str, str | int]],
file_metadata: dict[str, str | int],
demand_scenario_mapping: dict[str, str],
output_directory: Path,
filters: DemandMetadataFilter | None = None,
) -> None:
"""
Restructures a single demand trace file and saves it in a parquet format.
Restructures a single demand trace file and saves it as parquet.

The output filename is the AEMO input filename with the .csv suffix replaced by
.parquet (e.g. CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.csv
becomes CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.parquet).

This function processes a demand trace file, restructures and saves it in a new format, with the original
input filename stem and a .parquet extension. It handles the mapping of scenario names and applies filters
if provided.
`file_metadata` is changed in place to hold the translated scenario (for including
and storing within the dataframe).

Args:
input_filepath: Path object representing the input demand trace file.
all_input_file_metadata: Metadata for all input files.
file_metadata: Metadata for this trace file (scenario name mutated in place).
demand_scenario_mapping: Dictionary mapping raw scenario names to IASR workbook scenario names.
output_directory: Directory where restructured files will be saved.
filters: DemandMetadataFilter or None, specifies which traces to parse based on metadata.
Expand All @@ -181,33 +186,34 @@ def restructure_demand_file(
Example:
>>> input_filepath = Path('CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.csv')

>>> file_metadata = {
... 'subregion': 'CNSW',
... 'scenario': 'HYDROGEN_EXPORT',
... 'poe': 'POE10',
... 'demand_type': 'OPSO_MODELLING',
... 'reference_year': 2011,
... }

>>> demand_scenario_mapping = {'HYDROGEN_EXPORT': 'Green Energy Exports'}

>>> restructure_demand_file(
... input_filepath=input_filepath,
... file_metadata=file_metadata,
... demand_scenario_mapping=demand_scenario_mapping,
... output_directory='/path/to/output'
... ) # doctest: +SKIP

# This will process the input file and save it in parquet format in the specified output directory
"""
file_metadata = dict(all_input_file_metadata[input_filepath])

file_metadata["scenario"] = get_save_scenario_for_demand_trace(
file_metadata, demand_scenario_mapping
)
file_metadata["scenario"] = demand_scenario_mapping[file_metadata["scenario"]]

parse_file = check_filter_by_metadata(file_metadata, filters)
if parse_file:
trace = read_trace_csv(input_filepath)
trace = trace_formatter(trace)
trace = _frame_with_metadata(trace, file_metadata)

save_filepath = output_directory / write_new_demand_filename(
metadata=file_metadata
)
save_filepath.parent.mkdir(parents=True, exist_ok=True)

save_filepath = output_directory / f"{input_filepath.stem}.parquet"
trace.write_parquet(save_filepath)


Expand All @@ -225,36 +231,3 @@ def _frame_with_metadata(trace: pl.DataFrame, file_metadata: dict) -> pl.DataFra
poe=pl.lit(file_metadata["poe"]),
demand_type=pl.lit(file_metadata["demand_type"]),
)


def get_save_scenario_for_demand_trace(
file_metadata: dict[str, str], demand_scenario_mapping: dict[str, str]
) -> str:
"""
Maps the raw scenario name to the IASR workbook scenario name.

Args:
file_metadata: Dictionary containing metadata for the demand trace file.
demand_scenario_mapping: Dictionary mapping raw scenario names to IASR workbook scenario names.

Returns:
The mapped scenario name as a string.
"""
return demand_scenario_mapping[file_metadata["scenario"]]


def write_new_demand_filename(metadata: dict[str, str]) -> str:
"""
Generates the output filename for a demand trace file.

Args:
metadata: Dictionary containing metadata for the demand trace file.

Returns:
A string representing the filename.
"""
m = metadata
subregion = m["subregion"].replace(" ", "_")
scenario = m["scenario"].replace(" ", "_")

return f"{scenario}_RefYear{m['reference_year']}_{subregion}_{m['poe']}_{m['demand_type']}.parquet"
8 changes: 4 additions & 4 deletions tests/test_get_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def test_get_demand_single_reference_year(parsed_trace_trace_directory: Path):
test_df_lazy = pl.scan_parquet(
TEST_DATA
/ "output"
/ "Green_Energy_Exports_RefYear2011_CNSW_POE10_OPSO_MODELLING.parquet"
/ "CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.parquet"
)

start_dt, end_dt = _year_range_to_dt_range(2023, 2024, year_type="fy")
Expand Down Expand Up @@ -182,7 +182,7 @@ def test_get_demand_multiple_reference_year(parsed_trace_trace_directory: Path):
test_df_lazy = pl.scan_parquet(
TEST_DATA
/ "output"
/ "Green_Energy_Exports_RefYear2011_CNSW_POE10_OPSO_MODELLING.parquet"
/ "CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.parquet"
)

test_df = (
Expand Down Expand Up @@ -366,7 +366,7 @@ def test_demand_single_reference_year(parsed_trace_trace_directory: Path):
test_df_lazy = pl.scan_parquet(
TEST_DATA
/ "output"
/ "Green_Energy_Exports_RefYear2011_CNSW_POE10_OPSO_MODELLING.parquet"
/ "CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.parquet"
)

start_dt, end_dt = _year_range_to_dt_range(2023, 2024, year_type="fy")
Expand Down Expand Up @@ -399,7 +399,7 @@ def test_demand_multiple_reference_years(parsed_trace_trace_directory: Path):
test_df_lazy = pl.scan_parquet(
TEST_DATA
/ "output"
/ "Green_Energy_Exports_RefYear2011_CNSW_POE10_OPSO_MODELLING.parquet"
/ "CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.parquet"
)

test_df = (
Expand Down
4 changes: 1 addition & 3 deletions tests/test_trace_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
def test_demand_trace_parsing(use_concurrency: bool):
"""Test demand trace parsing produces expected parquet output."""
test_demand_csv_directory = TEST_DATA / "demand"
expected_filename = (
"Green_Energy_Exports_RefYear2011_CNSW_POE10_OPSO_MODELLING.parquet"
)
expected_filename = "CNSW_RefYear_2011_HYDROGEN_EXPORT_POE10_OPSO_MODELLING.parquet"
test_demand_output_parquet = TEST_DATA / "output" / expected_filename

with tempfile.TemporaryDirectory() as tmp_parsed_directory:
Expand Down
14 changes: 0 additions & 14 deletions tests/test_writing_save_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,3 @@ def test_write_wind_save_names():
)

assert str(save_filepath) == "RefYear1_a_x.parquet"


def test_write_demand_save_names():
meta_data = {
"scenario": "a",
"reference_year": "1",
"subregion": "x",
"poe": "poe10",
"demand_type": "y",
}

save_filepath = isp_trace_parser.demand_traces.write_new_demand_filename(meta_data)

assert str(save_filepath) == "a_RefYear1_x_poe10_y.parquet"
Loading