diff --git a/src/mdio/converters/mdio.py b/src/mdio/converters/mdio.py index 489499ec..ce59f745 100644 --- a/src/mdio/converters/mdio.py +++ b/src/mdio/converters/mdio.py @@ -14,6 +14,7 @@ from mdio.segy.blocked_io import to_segy from mdio.segy.creation import concat_files from mdio.segy.creation import mdio_spec_to_segy +from mdio.segy.utilities import project_headers_to_segy_spec from mdio.segy.utilities import segy_export_rechunker try: @@ -132,11 +133,13 @@ def mdio_to_segy( # noqa: PLR0912, PLR0913, PLR0915 out_dir = output_path.parent tmp_dir = TemporaryDirectory(dir=out_dir) + projected_headers = project_headers_to_segy_spec(dataset["headers"].data, segy_spec) + with tmp_dir: with TqdmCallback(desc="Unwrapping MDIO Blocks"): block_records = to_segy( samples=dataset[default_variable_name].data, - headers=dataset["headers"].data, + headers=projected_headers, live_mask=dataset["trace_mask"].data, segy_factory=segy_factory, file_root=tmp_dir.name, diff --git a/src/mdio/segy/utilities.py b/src/mdio/segy/utilities.py index 1055476e..f5e32660 100644 --- a/src/mdio/segy/utilities.py +++ b/src/mdio/segy/utilities.py @@ -15,8 +15,11 @@ from mdio.segy.parsers import parse_headers if TYPE_CHECKING: + from dask.array import Array as DaskArray from numpy.typing import DTypeLike + from numpy.typing import NDArray from segy.arrays import HeaderArray + from segy.schema import SegySpec from mdio.builder.templates.base import AbstractDatasetTemplate from mdio.segy.file import SegyFileArguments @@ -183,6 +186,51 @@ def get_grid_plan( # noqa: C901, PLR0912, PLR0913, PLR0915 return dimensions, chunksize +def project_headers_to_segy_spec(headers: DaskArray, segy_spec: SegySpec) -> DaskArray: + """Project stored MDIO trace headers onto the SegySpec trace header layout. + + ``SegyFactory.create_traces`` assigns headers by numpy structured-array slot position, + not by field name, so the input must expose exactly the SegySpec fields in SegySpec + order. A packed (no-padding), native-byte-order dtype is used to avoid numpy byteswap + artifacts over padding bytes. + + Args: + headers: Dask array holding MDIO trace headers with structured dtype. + segy_spec: Target SegySpec describing the output SEG-Y trace header layout. + + Returns: + Dask array with a packed, native-byte-order dtype ordered like the SegySpec. + + Raises: + ValueError: If SegySpec requests header fields that do not exist in MDIO headers. + """ + spec_header_dtype = segy_spec.trace.header.dtype + target_names = list(spec_header_dtype.names) + + source_names = headers.dtype.names + missing = [name for name in target_names if name not in source_names] + if missing: + msg = ( + f"SegySpec requires trace header fields not present in MDIO: {missing}. " + f"Available MDIO header fields: {sorted(source_names)}." + ) + raise ValueError(msg) + + target_dtype = np.dtype([(name, spec_header_dtype.fields[name][0].newbyteorder("=")) for name in target_names]) + + # Don't actually project if the dtype is already the same as the target dtype. + if headers.dtype == target_dtype: + return headers + + def _project_block(block: NDArray) -> NDArray: + out = np.empty(block.shape, dtype=target_dtype) + for name in target_names: + out[name] = block[name] + return out + + return headers.map_blocks(_project_block, dtype=target_dtype) + + def find_trailing_ones_index(dim_blocks: tuple[int, ...]) -> int: """Finds the index where trailing '1's begin in a tuple of dimension block sizes. diff --git a/tests/integration/test_segy_import_export_masked.py b/tests/integration/test_segy_import_export_masked.py index af91c9d9..ebcbeee0 100644 --- a/tests/integration/test_segy_import_export_masked.py +++ b/tests/integration/test_segy_import_export_masked.py @@ -18,7 +18,9 @@ from numpy.testing import assert_array_equal from segy.factory import SegyFactory from segy.schema import HeaderField +from segy.schema import HeaderSpec from segy.schema import SegySpec +from segy.schema import TraceSpec from segy.standards import get_segy_standard from mdio import mdio_to_segy @@ -560,3 +562,125 @@ def read_segy_trace_header(trace_index: int) -> bytes: assert_array_equal(mdio_header_bytes, segy_header_bytes) segy_trace_idx += 1 + + +def _spec_with_trace_header_fields(base_spec: SegySpec, fields: list[HeaderField]) -> SegySpec: + """Return a copy of base_spec whose trace header contains exactly the given fields. + + Unlike ``SegySpec.customize``, which merges into the base field set, this builds a + fresh ``HeaderSpec`` so we can produce true subsets and arbitrary orderings used in + the header projection tests. + """ + trace = TraceSpec( + header=HeaderSpec( + fields=fields, + item_size=base_spec.trace.header.item_size, + endianness=base_spec.trace.header.endianness, + ), + data=base_spec.trace.data, + ) + return SegySpec( + segy_standard=base_spec.segy_standard, + text_header=base_spec.text_header, + binary_header=base_spec.binary_header, + trace=trace, + endianness=base_spec.endianness, + ) + + +@dataclass +class ProjectionFixture: + """Pre-ingested MDIO and baseline SegySpec shared by header projection tests.""" + + root: Path + mdio_path: Path + baseline_spec: SegySpec + + +@pytest.fixture(scope="module") +def projection_mdio(tmp_path_factory: pytest.TempPathFactory) -> ProjectionFixture: + """Ingest a small 3D-stack SEG-Y once for the header projection tests.""" + grid_conf, factory_conf, to_mdio_conf, _ = STACK_3D_CONF + root = tmp_path_factory.mktemp("header_projection") + segy_path = root / f"{grid_conf.name}.sgy" + mdio_path = root / f"{grid_conf.name}.mdio" + + mock_nd_segy(segy_path, grid_conf, factory_conf) + baseline_spec = _segy_spec_mock_nd_segy(grid_conf, factory_conf) + segy_to_mdio( + segy_spec=baseline_spec, + mdio_template=TemplateRegistry().get(to_mdio_conf.template), + input_path=segy_path, + output_path=mdio_path, + overwrite=True, + ) + return ProjectionFixture(root=root, mdio_path=mdio_path, baseline_spec=baseline_spec) + + +class TestExportSegySpecHeaderProjection: + """Verify mdio_to_segy supports subset and reordered SegySpecs (issue #769).""" + + def test_reordered_spec_produces_identical_bytes(self, projection_mdio: ProjectionFixture) -> None: + """Reordering trace header fields must not change the serialized SEG-Y.""" + baseline_spec = projection_mdio.baseline_spec + reordered_spec = _spec_with_trace_header_fields( + baseline_spec, + list(reversed(baseline_spec.trace.header.fields)), + ) + + baseline_out = projection_mdio.root / "baseline.sgy" + reordered_out = projection_mdio.root / "reordered.sgy" + + mdio_to_segy(segy_spec=baseline_spec, input_path=projection_mdio.mdio_path, output_path=baseline_out) + mdio_to_segy(segy_spec=reordered_spec, input_path=projection_mdio.mdio_path, output_path=reordered_out) + + assert baseline_out.read_bytes() == reordered_out.read_bytes() + + def test_subset_spec_preserves_selected_fields(self, projection_mdio: ProjectionFixture) -> None: + """Subsetting trace header fields writes those fields at their declared byte locations.""" + baseline_spec = projection_mdio.baseline_spec + + subset_fields = [ + HeaderField(name="crossline", byte=193, format="int32"), + HeaderField(name="inline", byte=189, format="int32"), + HeaderField(name="samples_per_trace", byte=115, format="int16"), + HeaderField(name="sample_interval", byte=117, format="int16"), + ] + subset_spec = _spec_with_trace_header_fields(baseline_spec, subset_fields) + + baseline_out = projection_mdio.root / "baseline_for_subset.sgy" + subset_out = projection_mdio.root / "subset.sgy" + + mdio_to_segy(segy_spec=baseline_spec, input_path=projection_mdio.mdio_path, output_path=baseline_out) + mdio_to_segy(segy_spec=subset_spec, input_path=projection_mdio.mdio_path, output_path=subset_out) + + baseline_sgy = SegyFileWrapper(baseline_out, spec=baseline_spec) + subset_sgy = SegyFileWrapper(subset_out, spec=subset_spec) + + assert baseline_sgy.num_traces == subset_sgy.num_traces + + baseline_traces = baseline_sgy.trace[:] + subset_traces = subset_sgy.trace[:] + + for name in ("inline", "crossline", "samples_per_trace", "sample_interval"): + assert_array_equal(subset_traces.header[name], baseline_traces.header[name]) + assert_array_equal(subset_traces.sample, baseline_traces.sample) + + def test_missing_field_in_mdio_raises(self, projection_mdio: ProjectionFixture) -> None: + """SegySpec with a field absent from MDIO headers must raise ValueError.""" + # 'offset' is not ingested for STACK_3D_CONF; requesting it must fail. + bad_fields = [ + HeaderField(name="inline", byte=189, format="int32"), + HeaderField(name="crossline", byte=193, format="int32"), + HeaderField(name="offset", byte=37, format="int32"), + HeaderField(name="samples_per_trace", byte=115, format="int16"), + HeaderField(name="sample_interval", byte=117, format="int16"), + ] + bad_spec = _spec_with_trace_header_fields(projection_mdio.baseline_spec, bad_fields) + + with pytest.raises(ValueError, match="offset"): + mdio_to_segy( + segy_spec=bad_spec, + input_path=projection_mdio.mdio_path, + output_path=projection_mdio.root / "should_not_exist.sgy", + ) diff --git a/tests/unit/test_segy_header_projection.py b/tests/unit/test_segy_header_projection.py new file mode 100644 index 00000000..f5f3d9c8 --- /dev/null +++ b/tests/unit/test_segy_header_projection.py @@ -0,0 +1,190 @@ +"""Tests for SEG-Y trace header projection used during MDIO to SEG-Y export.""" + +from __future__ import annotations + +import dask.array as da +import numpy as np +import pytest +from segy.schema import HeaderField +from segy.schema import HeaderSpec +from segy.schema import SegySpec +from segy.schema import TraceDataSpec +from segy.schema import TraceSpec +from segy.standards import get_segy_standard + +from mdio.segy.utilities import project_headers_to_segy_spec + + +def _make_segy_spec(fields: list[HeaderField]) -> SegySpec: + """Build a SegySpec whose trace header contains exactly the given fields.""" + base = get_segy_standard(1.0) + trace = TraceSpec( + header=HeaderSpec(fields=fields, item_size=240), + data=TraceDataSpec(format="ibm32"), + ) + return SegySpec( + segy_standard=base.segy_standard, + text_header=base.text_header, + binary_header=base.binary_header, + trace=trace, + ) + + +def _mdio_headers(num: int) -> np.ndarray: + """Create a structured array simulating MDIO-stored headers (superset of fields).""" + dtype = np.dtype( + [ + ("inline", " None: + """A SegySpec with fewer fields than MDIO yields only those fields.""" + headers = da.from_array(_mdio_headers(8), chunks=4) + spec = _make_segy_spec( + [ + HeaderField(name="inline", byte=189, format="int32"), + HeaderField(name="crossline", byte=193, format="int32"), + ] + ) + + projected = project_headers_to_segy_spec(headers, spec).compute() + + assert list(projected.dtype.names) == ["inline", "crossline"] + np.testing.assert_array_equal(projected["inline"], np.arange(8, dtype=np.int32)) + np.testing.assert_array_equal(projected["crossline"], np.arange(8, dtype=np.int32) * 2) + + def test_reordered_spec_preserves_values_per_field_name(self) -> None: + """Reordering SegySpec fields keeps the per-name values intact.""" + headers = da.from_array(_mdio_headers(6), chunks=3) + # Deliberately reverse the MDIO storage order. + spec = _make_segy_spec( + [ + HeaderField(name="cdp_y", byte=185, format="int32"), + HeaderField(name="cdp_x", byte=181, format="int32"), + HeaderField(name="crossline", byte=193, format="int32"), + HeaderField(name="inline", byte=189, format="int32"), + ] + ) + + projected = project_headers_to_segy_spec(headers, spec).compute() + + assert list(projected.dtype.names) == ["cdp_y", "cdp_x", "crossline", "inline"] + np.testing.assert_array_equal(projected["inline"], np.arange(6, dtype=np.int32)) + np.testing.assert_array_equal(projected["crossline"], np.arange(6, dtype=np.int32) * 2) + np.testing.assert_array_equal(projected["cdp_x"], np.arange(6, dtype=np.int32) * 10) + np.testing.assert_array_equal(projected["cdp_y"], np.arange(6, dtype=np.int32) * 20) + + def test_missing_field_raises_value_error(self) -> None: + """Requesting a field not present in MDIO raises ValueError.""" + headers = da.from_array(_mdio_headers(4), chunks=2) + spec = _make_segy_spec( + [ + HeaderField(name="inline", byte=189, format="int32"), + HeaderField(name="offset", byte=37, format="int32"), + ] + ) + + with pytest.raises(ValueError, match="offset"): + project_headers_to_segy_spec(headers, spec) + + def test_output_dtype_is_packed_native_matching_spec_field_order(self) -> None: + """Projected dtype is packed (no gaps), native-order, and ordered like the SegySpec. + + A packed dtype sidesteps numpy byteswap artifacts on structured arrays with + explicit field offsets / padding bytes. Downstream SegyFactory assignment relies on + positional (not name-based) copying, so ordering must match the SegySpec. + """ + headers = da.from_array(_mdio_headers(2), chunks=2) + spec = _make_segy_spec( + [ + HeaderField(name="inline", byte=189, format="int32"), + HeaderField(name="crossline", byte=193, format="int32"), + ] + ) + + projected = project_headers_to_segy_spec(headers, spec) + + spec_names = list(spec.trace.header.dtype.names) + assert list(projected.dtype.names) == spec_names + packed_itemsize = sum(spec.trace.header.dtype.fields[name][0].itemsize for name in spec_names) + assert projected.dtype.itemsize == packed_itemsize + for name in spec_names: + spec_field_dtype = spec.trace.header.dtype.fields[name][0] + assert projected.dtype.fields[name][0] == spec_field_dtype.newbyteorder("=") + + def test_projection_independent_of_mdio_field_order(self) -> None: + """Reordering the MDIO source fields must not change projected values.""" + original = _mdio_headers(5) + shuffled_dtype = np.dtype( + [ + ("cdp_y", " None: + """If source headers dtype exactly matches target dtype, return array unchanged.""" + spec = _make_segy_spec( + [ + HeaderField(name="inline", byte=189, format="int32"), + HeaderField(name="crossline", byte=193, format="int32"), + ] + ) + + target_dtype = np.dtype( + [ + ("inline", spec.trace.header.dtype.fields["inline"][0].newbyteorder("=")), + ("crossline", spec.trace.header.dtype.fields["crossline"][0].newbyteorder("=")), + ] + ) + + # Create a dask array that perfectly matches the target_dtype + headers_np = np.zeros(5, dtype=target_dtype) + headers_np["inline"] = np.arange(5) + headers_np["crossline"] = np.arange(5) * 2 + headers_da = da.from_array(headers_np, chunks=5) + + projected_da = project_headers_to_segy_spec(headers_da, spec) + + # The exact same dask array object should be returned (no map_blocks overhead) + assert projected_da is headers_da