diff --git a/imap_processing/ena_maps/ena_maps.py b/imap_processing/ena_maps/ena_maps.py index 6072b34505..36e903587b 100644 --- a/imap_processing/ena_maps/ena_maps.py +++ b/imap_processing/ena_maps/ena_maps.py @@ -1491,11 +1491,25 @@ def build_cdf_dataset( # noqa: PLR0912 # Now set global attributes map_attrs = cdf_attrs.get_global_attributes(f"imap_{instrument}_{level}_enamap") map_attrs["Spacing_degrees"] = str(self.spacing_deg) - for key in ["Data_type", "Logical_source", "Logical_source_description"]: + for key in ["Data_type", "Logical_source"]: map_attrs[key] = map_attrs[key].format( descriptor=descriptor, sensor=sensor, ) + # Use the MapDescriptor to generate the Logical_source_description + # when possible, but preserve previous behavior for non-descriptor + # strings so later validation still raises the intended errors. + try: + md = naming.MapDescriptor.from_string(descriptor) + except ValueError: + map_attrs["Logical_source_description"] = map_attrs[ + "Logical_source_description" + ].format( + descriptor=descriptor, + sensor=sensor, + ) + else: + map_attrs["Logical_source_description"] = md.to_logical_source_description() # Always add the following attributes to the map map_attrs.update( { diff --git a/imap_processing/ena_maps/utils/naming.py b/imap_processing/ena_maps/utils/naming.py index d8aa7b6d9e..5b5a088085 100644 --- a/imap_processing/ena_maps/utils/naming.py +++ b/imap_processing/ena_maps/utils/naming.py @@ -173,6 +173,86 @@ def to_string(self) -> str: ] ) + def _parse_principal_data(self) -> tuple[str, str]: + """ + Parse principal_data and return (data_type, extras) tuple. + + Returns + ------- + tuple[str, str] + A tuple of (data_type, extras) parsed from principal_data. + """ + m = re.match( + r"^(drt|ena|int|isn|spx)(?:(?<=spx)\d+)?([^-_\s]*)$", self.principal_data + ) + if not m: + raise ValueError( + "Invalid principal_data format: " + f"{self.principal_data}. Expected one of 'drt', 'ena', 'int', " + "'isn', or 'spx' optionally followed by digits and trailing " + "non-separator text." + ) + return m.group(1), m.group(2) + + def _get_resolution_str(self, full: bool = False) -> str: + """ + Get formatted resolution string. + + Parameters + ---------- + full : bool, optional + If True, return full format (e.g., "rectangular 2 degree"). + If False, return short format (e.g., "2 deg"). Default is False. + + Returns + ------- + str + Formatted resolution string. + """ + m = re.match(r"^(\d+)deg|nside(\d+)", self.resolution_str) + if not m: + raise ValueError( + f"Invalid resolution_str format: {self.resolution_str}. " + "Expected format like '2deg' or 'nside32'." + ) + if full: + if m.group(1): + return f"rectangular {m.group(1)} degree" + return f"HEALPix nside {m.group(2)}" + return f"{m.group(1)} deg" if m.group(1) else f"NSide {m.group(2)}" + + def _get_duration_str(self, full: bool = False) -> str: + """ + Get formatted duration string. + + Parameters + ---------- + full : bool, optional + If True, return full format (e.g., "6 months"). + If False, return short format (e.g., "6 Mon"). Default is False. + + Returns + ------- + str + Formatted duration string. + """ + if isinstance(self.duration, int): + return f"{self.duration} days" if full else f"{self.duration} Day" + + m = re.match(r"^(\d+)(.*)$", self.duration) + num = int(m.group(1)) + unit = m.group(2).lower() + + if full: + if unit == "yr": + return f"{num} year" if num == 1 else f"{num} years" + elif unit == "mo": + return f"{num} month" if num == 1 else f"{num} months" + return f"{num} {unit}" + + duration = f"{num} {m.group(2).title()}" + return duration + "n" if duration.endswith("Mo") else duration + def to_catdesc(self) -> str: """ Convert the MapDescriptor instance to a human-readable CATDESC string. @@ -188,38 +268,30 @@ def to_catdesc(self) -> str: instrument = instrument.title() sensor = " Combined" if self.sensor == "combined" else self.sensor species = "UV" if self.species == "uv" else self.species.title() - m = re.match( - r"^(drt|ena|int|isn|spx)(?:(?<=spx)\d+)?([^-_\s]*)$", self.principal_data - ) + + data_type, extras = self._parse_principal_data() + + # Quantity (e.g., "Inten", "Rate", "Spectral") quantity = { "drt": "Rate", "ena": "Inten", "int": "Inten", "isn": "Rate", "spx": "Spectral", - }[m.group(1)] - if m.group(1) == "isn": + }[data_type] + + if data_type == "isn": species = "ISN " + species - extras = m.group(2) + coord = self.coordinate_system.upper() - frame = { - "hf": "Helio", - "hk": "Helio Kin", - "sf": "SC", - }[self.frame_descriptor] + frame = {"hf": "Helio", "hk": "Helio Kin", "sf": "SC"}[self.frame_descriptor] survival = "Surv Corr" if self.survival_corrected == "sp" else "No Surv Corr" spin_phase = self.spin_phase.title() - if spin_phase == "Full": - spin_phase = "Full Spin" - m = re.match(r"^(\d+)deg|nside(\d+)", self.resolution_str) - resolution = f"{m.group(1)} deg" if m.group(1) else f"NSide {m.group(2)}" - if isinstance(self.duration, int): - duration = f"{self.duration} Day" - else: - m = re.match(r"^(\d+)(.*)$", self.duration) - duration = f"{m.group(1)} {m.group(2).title()}" - if duration.endswith("Mo"): - duration += "n" + spin_phase = "Full Spin" if spin_phase == "Full" else spin_phase + + resolution = self._get_resolution_str(full=False) + duration = self._get_duration_str(full=False) + catdesc = ( f"IMAP {instrument}{sensor} {species} {quantity}, {coord} " f"{frame} Frame, {survival}, {spin_phase}, {resolution}, {duration}" @@ -234,6 +306,92 @@ def to_catdesc(self) -> str: break return catdesc + def to_logical_source_description(self) -> str: + """ + Convert the MapDescriptor instance to a Logical_source_description string. + + Returns + ------- + str + A full description suitable for the Logical_source_description + global CDF attribute. + """ + # Instrument name (e.g., "IMAP-Hi", "IMAP-Ultra", "IMAP-GLOWS") + instrument_base = self.instrument.name.split("_")[0] + instrument = ( + f"IMAP-{instrument_base}" + if instrument_base in ("IDEX", "GLOWS") + else f"IMAP-{instrument_base.title()}" + ) + + # Sensor (e.g., "45 degree sensor", "combined sensor", "") + if self.sensor == "combined": + sensor = "combined sensor" + elif self.sensor in ("45", "90"): + sensor = f"{self.sensor} degree sensor" + elif self.sensor: + sensor = f"sensor {self.sensor}" + else: + sensor = "" + + # Species (e.g., "Hydrogen", "Helium", "UV") + species_names = {"h": "Hydrogen", "he": "Helium", "o": "Oxygen", "uv": "UV"} + species = species_names.get(self.species.lower(), self.species.title()) + + data_type, _ = self._parse_principal_data() + + # Quantity (e.g., "ENA Intensity", "Rate", "Dust Rate") + quantity = { + "drt": "Dust Rate", + "ena": "ENA Intensity", + "int": "Intensity", + "isn": "Rate", + "spx": "Spectral Index", + }[data_type] + + # Handle special species cases + if data_type == "isn": + species = f"Interstellar Neutral {species}" + elif data_type == "drt": + # Dust rate maps don't have a species + species = "" + + # Frame (e.g., "heliospheric", "spacecraft", "heliocentric kinetic") + frame = INERTIAL_FRAME_LONG_NAMES[self.frame_descriptor] + + # Coordinate system (e.g., "HAE", "GCS") + coord = self.coordinate_system.upper() + + # Survival correction + if self.survival_corrected == "sp": + survival = "with survival probability correction" + else: + survival = "with no survival correction" + + # Spin phase (e.g., "full spin", "ram", "anti-ram") + spin_phase = { + "full": "full spin", + "ram": "ram", + "anti": "anti-ram", + }.get(self.spin_phase.lower(), self.spin_phase) + + duration = self._get_duration_str(full=True) + resolution = self._get_resolution_str(full=True) + + # Build the full description + # Order matches descriptor: instrument-sensor, quantity, species, frame, + # survival, spin_phase, coord, resolution, duration + sensor_part = f" {sensor}" if sensor else "" + species_part = f"{species} " if species else "" + description = ( + f"{instrument} Instrument Level-2{sensor_part} map of {species_part}" + f"{quantity} in the {frame} frame {survival} in the " + f"{spin_phase} direction in {coord} coordinates on {resolution} " + f"tiling over {duration}." + ) + + return description + @property def principal_data_var(self) -> str: """ diff --git a/imap_processing/tests/ena_maps/test_naming.py b/imap_processing/tests/ena_maps/test_naming.py index 0c5d03ffbb..ab68edcca2 100644 --- a/imap_processing/tests/ena_maps/test_naming.py +++ b/imap_processing/tests/ena_maps/test_naming.py @@ -394,3 +394,62 @@ def test_to_catdesc(self, descriptor_str, expected_catdesc): def test_principal_data_var(self, descriptor_str, expected_principal_data_var): md = MapDescriptor.from_string(descriptor_str) assert md.principal_data_var == expected_principal_data_var + + @pytest.mark.parametrize( + "descriptor_str, expected_description", + [ + ( + "h45-ena-h-hf-nsp-full-hae-2deg-6mo", + "IMAP-Hi Instrument Level-2 45 degree sensor map of Hydrogen " + "ENA Intensity in the heliospheric frame with no survival " + "correction in the full spin direction in HAE coordinates on " + "rectangular 2 degree tiling over 6 months.", + ), + ( + "hic-ena-h-hf-sp-ram-hae-nside64-1yr", + "IMAP-Hi Instrument Level-2 combined sensor map of Hydrogen " + "ENA Intensity in the heliospheric frame with survival " + "probability correction in the ram direction in HAE coordinates " + "on HEALPix nside 64 tiling over 1 year.", + ), + ( + "u90-ena-h-hf-nsp-full-hae-nside128-6mo", + "IMAP-Ultra Instrument Level-2 90 degree sensor map of Hydrogen " + "ENA Intensity in the heliospheric frame with no survival " + "correction in the full spin direction in HAE coordinates on " + "HEALPix nside 128 tiling over 6 months.", + ), + ( + "ilo-isn-h-sf-nsp-ram-hae-2deg-3mo", + "IMAP-Lo Instrument Level-2 map of Interstellar Neutral Hydrogen " + "Rate in the spacecraft frame with no survival correction " + "in the ram direction in HAE coordinates on rectangular 2 degree " + "tiling over 3 months.", + ), + ( + "glx-int-uv-hf-nsp-full-hae-2deg-6mo", + "IMAP-GLOWS Instrument Level-2 map of UV Intensity " + "in the heliospheric frame with no survival correction " + "in the full spin direction in HAE coordinates on rectangular " + "2 degree tiling over 6 months.", + ), + ( + "idx-drt-dust-hf-nsp-full-hae-nside32-1yr", + "IMAP-IDEX Instrument Level-2 map of Dust Rate " + "in the heliospheric frame with no survival correction " + "in the full spin direction in HAE coordinates on HEALPix " + "nside 32 tiling over 1 year.", + ), + ( + "u45-ena-he-hk-sp-anti-hae-4deg-2mo", + "IMAP-Ultra Instrument Level-2 45 degree sensor map of Helium " + "ENA Intensity in the heliocentric kinetic frame with survival " + "probability correction in the anti-ram direction in HAE " + "coordinates on rectangular 4 degree tiling over 2 months.", + ), + ], + ) + def test_to_logical_source_description(self, descriptor_str, expected_description): + md = MapDescriptor.from_string(descriptor_str) + actual_description = md.to_logical_source_description() + assert actual_description == expected_description diff --git a/imap_processing/tests/hi/test_hi_l2.py b/imap_processing/tests/hi/test_hi_l2.py index 81db20e041..68b459808e 100644 --- a/imap_processing/tests/hi/test_hi_l2.py +++ b/imap_processing/tests/hi/test_hi_l2.py @@ -174,7 +174,9 @@ def test_hi_l2( # Check some global attributes assert l2_dataset.attrs["Data_type"].startswith(f"L2_{descriptor_str}") assert l2_dataset.attrs["Logical_source"] == f"imap_hi_l2_{descriptor_str}" - assert "Hi90" in l2_dataset.attrs["Logical_source_description"] + assert l2_dataset.attrs["Logical_source_description"].startswith( + "IMAP-Hi Instrument Level-2" + ) assert len(l2_dataset.data_vars) == 15 np.testing.assert_array_equal( diff --git a/imap_processing/ultra/l2/ultra_l2.py b/imap_processing/ultra/l2/ultra_l2.py index b67aa30181..f7cec2e41e 100644 --- a/imap_processing/ultra/l2/ultra_l2.py +++ b/imap_processing/ultra/l2/ultra_l2.py @@ -749,7 +749,7 @@ def ultra_l2( # Get the global attributes, and then fill the sensor, tiling, etc. in the # format-able strings. map_attrs.update(cdf_attrs.get_global_attributes("imap_ultra_l2_enamap")) - for key in ["Data_type", "Logical_source", "Logical_source_description"]: + for key in ["Data_type", "Logical_source"]: map_attrs[key] = map_attrs[key].format( sensor=ultra_sensor_number, tiling=output_map_structure.tiling_type.value.lower(), @@ -763,6 +763,28 @@ def ultra_l2( else f"nside{output_map_structure.nside}" ), inertial_frame_short_name=inertial_frame, + ) + # Use the previously parsed MapDescriptor to generate the + # Logical_source_description + if descriptor is not None: + map_attrs["Logical_source_description"] = ( + map_descriptor.to_logical_source_description() + ) + else: + map_attrs["Logical_source_description"] = map_attrs[ + "Logical_source_description" + ].format( + sensor=ultra_sensor_number, + tiling=output_map_structure.tiling_type.value.lower(), + duration=map_duration, + resolution_string=( + f"{output_map_structure.spacing_deg:.0f}deg" + if ( + output_map_structure.tiling_type + is ena_maps.SkyTilingType.RECTANGULAR + ) + else f"nside{output_map_structure.nside}" + ), inertial_frame_long_name=inertial_frame_long_name, )