Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies = [
"numpy<2",
"matplotlib",
"pandas",
"pint",
"scipy",
"vtk",
"h5py",
Expand Down
301 changes: 151 additions & 150 deletions src/openlifu/util/units.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,147 @@
from __future__ import annotations

import re

import numpy as np
from pint import UnitRegistry
from pint.errors import DimensionalityError, UndefinedUnitError
from xarray import Dataset

ureg = UnitRegistry()
Q_ = ureg.Quantity

_ANGLE_UNITS = {
"rad",
"radian",
"radians",
"deg",
"degree",
"degrees",
"\u00b0",
"\u00c2\u00b0",
}

_UNIT_ALIASES = {
"micron": "micrometer",
"microns": "micrometer",
"um": "micrometer",
"\u00b5m": "micrometer",
"\u03bcm": "micrometer",
"sec": "second",
"secs": "second",
"min": "minute",
"mins": "minute",
"hr": "hour",
"hrs": "hour",
"\u00b0": "degree",
"\u00c2\u00b0": "degree",
"cc": "cm^3",
"kgram": "kilogram",
"kgrams": "kilograms",
"amps": "ampere",
"amp": "ampere",
}

_BASE_UNITS_BY_TYPE = {
"distance": "m",
"area": "m^2",
"volume": "m^3",
"time": "s",
"angle": "rad",
"frequency": "Hz",
"pressure": "Pa",
"watt": "W",
}


def _normalize_unit(unit: str) -> str:
unit = unit.strip()

unit = unit.replace("\u00b2", "^2").replace("\u00b3", "^3")
unit = unit.replace("\u00b5", "u").replace("\u03bc", "u")

# Fix common typos
unit = re.sub(r"sec(s)?\b", "second", unit, flags=re.IGNORECASE)
unit = re.sub(r"\bmili", "milli", unit, flags=re.IGNORECASE)
unit = re.sub(r"grams?\b", "gram", unit, flags=re.IGNORECASE)
unit = re.sub(r"meters?\b", "meter", unit, flags=re.IGNORECASE)
unit = re.sub(r"\s+", " ", unit)

normalized_parts = []
for part in re.split(r"([/*])", unit):
stripped_part = part.strip()
if stripped_part in {"/", "*", ""}:
normalized_parts.append(stripped_part)
continue

part_key = stripped_part.lower()
normalized_parts.append(_UNIT_ALIASES.get(part_key, _normalize_unit_symbol(stripped_part)))

normalized = "".join(normalized_parts)

normalized = re.sub(r"\b([a-zA-Z]+)([23])\b", r"\1^\2", normalized)
return normalized


def _normalize_unit_symbol(unit: str) -> str:
unit = re.sub(r"\b([a-zA-Z]+)([23])\b", r"\1^\2", unit)

for suffix, canonical_suffix in (("hz", "Hz"), ("pa", "Pa")):
suffix_match = re.fullmatch(rf"([A-Za-z]*){suffix}(\^\d+)?", unit, flags=re.IGNORECASE)
if suffix_match:
prefix, power = suffix_match.groups()
return f"{prefix}{canonical_suffix}{power or ''}"

watt_match = re.fullmatch(r"([A-Za-z]*)w(\^\d+)?", unit, flags=re.IGNORECASE)
if watt_match:
prefix, power = watt_match.groups()
return f"{prefix}W{power or ''}"

return unit


def _quantity(unit: str):
return Q_(1, _normalize_unit(unit))


def getunittype(unit):
unit = unit.lower()
if unit in ['micron', 'microns']:
return 'distance'
elif unit in ['minute', 'minutes', 'min', 'mins', 'hour', 'hours', 'hr', 'hrs', 'day', 'days', 'd']:
return 'time'
elif unit in ['rad', 'deg', 'radian', 'radians', 'degree', 'degrees', '°']:
return 'angle'
elif 'sec' in unit:
return 'time'
elif 'meter' in unit or 'micron' in unit:
return 'distance'
elif unit.endswith('s'):
return 'time'
elif unit.endswith('m'):
return 'distance'
elif unit.endswith(('m2', 'm^2')):
return 'area'
elif unit.endswith(('m3', 'm^3')):
return 'volume'
elif unit.endswith('hz'):
return 'frequency'
elif unit.endswith('pa'):
return 'pressure'
elif unit.endswith('w'):
return 'watt'
else:
return 'other'
normalized_unit = _normalize_unit(unit)

if normalized_unit.lower() in _ANGLE_UNITS:
return "angle"

try:
dim = Q_(1, normalized_unit).dimensionality
except (TypeError, UndefinedUnitError):
return "other"

if dim == ureg.meter.dimensionality:
return "distance"
if dim == (ureg.meter**2).dimensionality:
return "area"
if dim == (ureg.meter**3).dimensionality:
return "volume"
if dim == ureg.second.dimensionality:
return "time"
if dim == (1 / ureg.second).dimensionality:
return "frequency"
if dim == ureg.pascal.dimensionality:
return "pressure"
if dim == ureg.watt.dimensionality:
return "watt"

return "other"


def getunitconversion(from_unit, to_unit, unitratio=None, constant=None):
if not from_unit:
return 1.0

if unitratio is not None and constant is not None:
if '/' not in unitratio:
raise ValueError('Conversion unit ratio must have a \'/\' symbol')
if "/" not in unitratio:
raise ValueError("Conversion unit ratio must have a '/' symbol")

unitn, unitd = unitratio.split('/')
unitn, unitd = unitratio.split("/")
type0 = getunittype(from_unit)
type1 = getunittype(to_unit)
typen = getunittype(unitn)
Expand All @@ -54,129 +154,30 @@ def getunitconversion(from_unit, to_unit, unitratio=None, constant=None):
elif type0 == type1:
scl = getunitconversion(from_unit, to_unit)
else:
raise ValueError(f'Unit type mismatch {type0} -> ({typen}/{typed}) -> {type1}')
raise ValueError(f"Unit type mismatch {type0} -> ({typen}/{typed}) -> {type1}")
else:
slash0 = from_unit.find('/')
slash1 = to_unit.find('/')

if slash0 != -1 and slash1 != -1:
num0 = from_unit[:slash0]
denom0 = from_unit[slash0+1:]
num1 = to_unit[:slash1]
denom1 = to_unit[slash1+1:]
scl = getunitconversion(num0, num1) / getunitconversion(denom0, denom1)
elif slash0 == -1 and slash1 == -1:
try:
scl = _quantity(from_unit).to(_normalize_unit(to_unit)).magnitude
except DimensionalityError as exc:
type0 = getunittype(from_unit)
type1 = getunittype(to_unit)
Comment on lines +159 to 163

if type0 != type1:
raise ValueError(f'Unit type mismatch ({type0}) vs ({type1})')

if type0 == 'other':
if from_unit[-1] != to_unit[-1]:
raise ValueError(f'Cannot convert {from_unit} to {to_unit}')

i = 0
while i < min(len(from_unit), len(to_unit)) and from_unit[-i:] == to_unit[-i:]:
type = from_unit[-i:]
i += 1

scl0 = getsiscale(from_unit, type)
scl1 = getsiscale(to_unit, type)
scl = scl0 / scl1
else:
scl0 = getsiscale(from_unit, type0)
scl1 = getsiscale(to_unit, type0)
scl = scl0 / scl1
else:
raise ValueError(f'Unit ratio mismatch ({from_unit} vs {to_unit})')
raise ValueError(f"Unit type mismatch ({type0}) vs ({type1})") from exc
except UndefinedUnitError as exc:
raise ValueError(f"Cannot convert {from_unit} to {to_unit}") from exc

return scl


def getsiscale(unit, type):
type = type.lower()

if type in ['distance', 'area', 'volume']:
idx = unit.find('meters')
if idx == -1:
idx = unit.find('meter')
if idx == -1:
if unit.lower() == 'micron':
idx = 6
else:
idx = unit.rfind('m')
if idx == -1:
idx = len(unit)

elif type == 'time':
idx = unit.find('seconds')
if idx == -1:
idx = unit.find('second')
if idx == -1:
idx = unit.find('sec')
if idx == -1:
idx = unit.rfind('s')
if idx == -1:
idx = len(unit)

elif type == 'angle':
idx = len(unit)

elif type == 'frequency' or type == "pressure":
idx = len(unit) - 2

elif type == "watt":
idx = len(unit) - 1
if type not in _BASE_UNITS_BY_TYPE:
raise ValueError(f"Unknown unit type {type}")

else:
idx = len(unit) - len(type) + 1

prefix = unit[:idx]

if not prefix:
scl = 1.0
else:
scl = 1.0

if prefix == 'pico' or prefix == 'p':
scl = 1.0e-12
elif prefix == 'nano' or prefix == 'n':
scl = 1.0e-9
elif prefix == 'micro' or prefix == 'u' or prefix == '\u00b5' or prefix == '\u03bc':
scl = 1.0e-6
elif prefix == 'milli' or prefix == 'm':
scl = 1.0e-3
elif prefix == 'centi' or prefix == 'c':
scl = 1.0e-2
elif prefix == '':
scl = 1.0
elif prefix == 'kilo' or prefix == 'k':
scl = 1.0e3
elif prefix == 'mega' or prefix == 'M':
scl = 1.0e6
elif prefix == 'giga' or prefix == 'G':
scl = 1.0e9
elif prefix == 'tera' or prefix == 'T':
scl = 1.0e12
elif prefix == 'min' or prefix == 'minute':
scl = 60.0
elif prefix == 'hour' or prefix == 'hr':
scl = 60.0 * 60.0
elif prefix == 'day' or prefix == 'd':
scl = 60.0 * 60.0 * 24.0
elif prefix == 'rad' or prefix == 'radian' or prefix == 'radians':
scl = 1.0
elif prefix == 'deg' or prefix == 'degree' or prefix == 'degrees' or prefix == '\u00b0':
scl = 2 * 3.14159265358979323846 / 360
elif prefix:
raise ValueError(f'Unknown prefix {prefix}')

if type == 'area':
scl = scl ** 2.0
elif type == 'volume':
scl = scl ** 3.0

return scl
try:
return getunitconversion(unit, _BASE_UNITS_BY_TYPE[type])
except ValueError as exc:
raise ValueError(f"Unknown prefix {unit}") from exc


def rescale_data_arr(data_arr: Dataset, units: str) -> Dataset:
Expand All @@ -191,9 +192,9 @@ def rescale_data_arr(data_arr: Dataset, units: str) -> Dataset:
rescaled: The rescaled xarray to new units.
"""
rescaled = data_arr.copy(deep=True)
scale = getunitconversion(data_arr.attrs['units'], units)
scale = getunitconversion(data_arr.attrs["units"], units)
rescaled.data *= scale
rescaled.attrs['units'] = units
rescaled.attrs["units"] = units

return rescaled

Expand All @@ -212,12 +213,12 @@ def rescale_coords(data_arr: Dataset, units: str) -> Dataset:
rescaled = data_arr.copy(deep=True)
for coord_key in data_arr.coords:
curr_coord_attrs = rescaled[coord_key].attrs
if 'units' in curr_coord_attrs:
curr_coord_units = curr_coord_attrs['units']
if "units" in curr_coord_attrs:
curr_coord_units = curr_coord_attrs["units"]
scale = getunitconversion(curr_coord_units, units)
curr_coord_rescaled = scale*rescaled[coord_key].data
curr_coord_rescaled = scale * rescaled[coord_key].data
rescaled = rescaled.assign_coords({coord_key: (coord_key, curr_coord_rescaled, curr_coord_attrs)})
rescaled[coord_key].attrs['units'] = units
rescaled[coord_key].attrs["units"] = units

return rescaled

Expand All @@ -237,7 +238,7 @@ def get_ndgrid_from_arr(data_arr: Dataset) -> np.ndarray:
ordered_key = data_arr[first_data_key].dims
all_coord = []
for coord_key in ordered_key:
if 'units' in data_arr[coord_key].attrs:
if "units" in data_arr[coord_key].attrs:
all_coord += [data_arr.coords[coord_key].data]
ndgrid = np.stack(np.meshgrid(*all_coord, indexing="ij"), axis=-1)

Expand Down
Loading
Loading