From a6f9c79683c681d07adca20476a3a39e30b6f824 Mon Sep 17 00:00:00 2001 From: SchrodingersCattt Date: Mon, 29 Jun 2026 09:33:45 +0000 Subject: [PATCH 1/3] Add disorder-aware CIF export demo --- .../disorder_workflow_demo/ReadMe.md | 46 +++ .../demo_csd_disorder_workflow.py | 280 ++++++++++++++++++ 2 files changed, 326 insertions(+) create mode 100644 scripts/materials/disorder_workflow_demo/ReadMe.md create mode 100644 scripts/materials/disorder_workflow_demo/demo_csd_disorder_workflow.py diff --git a/scripts/materials/disorder_workflow_demo/ReadMe.md b/scripts/materials/disorder_workflow_demo/ReadMe.md new file mode 100644 index 0000000..ce42aa1 --- /dev/null +++ b/scripts/materials/disorder_workflow_demo/ReadMe.md @@ -0,0 +1,46 @@ +# Disorder Workflow Demo + +## Summary + +Exports a CSD entry to a manually assembled CIF that preserves atom-site occupancy, +disorder assembly/group annotations, anisotropic displacement parameters, and bond +connectivity. + +This example demonstrates a workflow that is not covered by the built-in CIF writers: +reading a disordered entry from the CSD Python API and reconstructing a CIF with +explicit `_atom_site_occupancy`, `_atom_site_disorder_assembly`, and +`_atom_site_disorder_group` fields. + +Example output: + +```text +Licence requirement: CSD-Materials or another licence tier with CSD Python API database access +Exported ABACIR -> output/ABACIR.cif (atoms=148, partial_occ=16, bonds=154) +Exported ABABUB -> output/ABABUB.cif (atoms=84, partial_occ=8, bonds=88) +``` + +## Requirements + +- CSD Python API with database access +- Python 3.10+ + +## Licensing Requirements + +CSD-Materials or another licence tier that provides access to the CSD Python API and CSD database. + +## Instructions on Running + +```bash +python demo_csd_disorder_workflow.py --refcodes ABACIR ABABUB --output-dir ./output +``` + +Arguments: + +- `--refcodes`: one or more CSD refcodes +- `--output-dir`: destination directory for exported CIF files + +## Author + +Ming-Yu Guo (2026-06-29) + +> For feedback or to report any issues please contact [support@ccdc.cam.ac.uk](mailto:support@ccdc.cam.ac.uk) diff --git a/scripts/materials/disorder_workflow_demo/demo_csd_disorder_workflow.py b/scripts/materials/disorder_workflow_demo/demo_csd_disorder_workflow.py new file mode 100644 index 0000000..8c6922d --- /dev/null +++ b/scripts/materials/disorder_workflow_demo/demo_csd_disorder_workflow.py @@ -0,0 +1,280 @@ +#!/usr/bin/env python +# +# This script can be used for any purpose without limitation subject to the +# conditions at https://www.ccdc.cam.ac.uk/Community/Pages/Licences/v2.aspx +# +# This permission notice and the following statement of attribution must be +# included in all copies or substantial portions of this script. +# +# 2026-06-29: created by Ming-Yu Guo +# + +"""Demo: export full disorder-aware CIF data from the CSD Python API. + +This script demonstrates a complete workflow for extracting crystal structures +from the Cambridge Structural Database (CSD) with atom-site occupancy, +disorder-group metadata, anisotropic displacement parameters, and bond +connectivity preserved in a manually assembled CIF file. + +Why this script exists: + The built-in CSD Python API CIF writers are convenient, but for some + disordered entries they do not expose a simple path to preserve the full + occupancy/disorder atom-site metadata needed for downstream enumeration or + custom reconstruction workflows. + +Pipeline: + 1. Read an entry from the CSD with ``EntryReader``. + 2. Access ``crystal.disordered_molecule`` when available. + 3. Collect atom-site occupancy, disorder assembly/group, ADP, and bonds. + 4. Write a CIF with explicit ``_atom_site_occupancy`` and disorder tags. + +Examples: + python demo_csd_disorder_workflow.py --refcodes ABACIR ABABUB --output-dir ./output +""" + +from __future__ import annotations + +import argparse +from pathlib import Path +from typing import Any + + +LICENSE_REQUIREMENT = "CSD-Materials or another licence tier with CSD Python API database access" + + +def _format_float(value: Any, precision: int = 4) -> str: + if value is None: + return "?" + return f"{float(value):.{precision}f}" + + +def _format_esd(value: Any, esd: Any, precision: int = 4) -> str: + if value is None: + return "?" + base = f"{float(value):.{precision}f}" + if esd is None: + return base + try: + if float(esd) > 0: + return f"{base}({esd})" + except (TypeError, ValueError): + pass + return base + + +def export_full_cif_from_csd(refcode: str, output_path: Path) -> dict[str, Any]: + """Export a CSD structure to CIF with occupancy and disorder metadata.""" + from ccdc.io import EntryReader + + with EntryReader("CSD") as reader: + entry = reader.entry(refcode) + crystal = reader.crystal(refcode) + + molecule = crystal.disordered_molecule or crystal.molecule + + disorder_map: dict[str, tuple[int | None, str | None]] = {} + disorder = getattr(crystal, "disorder", None) + if disorder is not None: + try: + for assembly in disorder.assemblies: + assembly_id = str(getattr(assembly, "id", "")) + for group in assembly.groups: + group_id = int(getattr(group, "id", 0)) + for atom in group.atoms: + disorder_map[str(atom.label)] = (group_id, assembly_id) + except Exception: + pass + + atom_data: list[dict[str, Any]] = [] + for atom in molecule.atoms: + label = str(atom.label) if atom.label else "?" + symbol = str(atom.atomic_symbol) if atom.atomic_symbol else "?" + fractional = atom.fractional_coordinates + occupancy = 1.0 + try: + if atom.occupancy is not None: + occupancy = float(atom.occupancy) + except Exception: + pass + + displacement_type = "?" + u_iso = None + aniso = None + try: + displacement = atom.displacement_parameters + if displacement is not None: + u_iso = float(displacement.isotropic_equivalent) + if displacement.type == "Anisotropic": + displacement_type = "Uani" + values = displacement.values + uncertainties = displacement.uncertainties + aniso = { + "u11": values[0][0], "u22": values[1][1], "u33": values[2][2], + "u12": values[0][1], "u13": values[0][2], "u23": values[1][2], + "e11": uncertainties[0][0], "e22": uncertainties[1][1], "e33": uncertainties[2][2], + "e12": uncertainties[0][1], "e13": uncertainties[0][2], "e23": uncertainties[1][2], + } + else: + displacement_type = "Uiso" + except Exception: + pass + + disorder_group, disorder_assembly = disorder_map.get(label, (None, None)) + atom_data.append({ + "label": label, + "symbol": symbol, + "fx": float(fractional.x) if fractional else None, + "fy": float(fractional.y) if fractional else None, + "fz": float(fractional.z) if fractional else None, + "occupancy": occupancy, + "u_iso": u_iso, + "displacement_type": displacement_type, + "disorder_group": disorder_group, + "disorder_assembly": disorder_assembly, + "aniso": aniso, + }) + + bonds: list[tuple[str, str, float, str]] = [] + try: + for bond in molecule.bonds: + atom_1, atom_2 = bond.atoms + bonds.append((str(atom_1.label), str(atom_2.label), float(bond.length), str(bond.bond_type))) + except Exception: + pass + + cell_a, cell_b, cell_c = crystal.cell_lengths + alpha, beta, gamma = crystal.cell_angles + disorder_present = any(atom["disorder_group"] is not None for atom in atom_data) + + lines = [ + f"data_{refcode}", + "_audit_creation_method 'CSD Python API disorder workflow demo'", + f"_chemical_name_common '{(entry.chemical_name or '?').replace(chr(39), chr(39) + chr(39))}'", + f"_chemical_formula_sum '{entry.formula or '?'}'", + f"_cell_length_a {_format_float(cell_a)}", + f"_cell_length_b {_format_float(cell_b)}", + f"_cell_length_c {_format_float(cell_c)}", + f"_cell_angle_alpha {_format_float(alpha, 2)}", + f"_cell_angle_beta {_format_float(beta, 2)}", + f"_cell_angle_gamma {_format_float(gamma, 2)}", + f"_cell_volume {_format_float(crystal.cell_volume, 2)}", + f"_cell_formula_units_Z {int(crystal.z_value) if crystal.z_value else '?'}", + f"_symmetry_space_group_name_H-M '{crystal.spacegroup_symbol or '?'}'", + "", + ] + + if crystal.symmetry_operators: + lines.extend(["loop_", "_symmetry_equiv_pos_as_xyz"]) + for operator in crystal.symmetry_operators: + lines.append(f" '{operator}'") + lines.append("") + + lines.extend([ + "loop_", + "_atom_site_label", + "_atom_site_type_symbol", + "_atom_site_fract_x", + "_atom_site_fract_y", + "_atom_site_fract_z", + "_atom_site_occupancy", + "_atom_site_U_iso_or_equiv", + "_atom_site_thermal_displace_type", + ]) + if disorder_present: + lines.extend([ + "_atom_site_disorder_assembly", + "_atom_site_disorder_group", + ]) + + for atom in atom_data: + if atom["fx"] is None: + continue + row = [ + f"{atom['label']:<8s}", + f"{atom['symbol']:<4s}", + f"{atom['fx']:12.6f}", + f"{atom['fy']:12.6f}", + f"{atom['fz']:12.6f}", + f"{atom['occupancy']:8.4f}", + f"{_format_float(atom['u_iso']):>10s}", + f"{atom['displacement_type']:>5s}", + ] + if disorder_present: + row.append(f"{str(atom['disorder_assembly']) if atom['disorder_assembly'] is not None else '.':>4s}") + row.append(f"{str(atom['disorder_group']) if atom['disorder_group'] is not None else '.':>4s}") + lines.append(" " + " ".join(row)) + lines.append("") + + anisotropic_atoms = [atom for atom in atom_data if atom["aniso"]] + if anisotropic_atoms: + lines.extend([ + "loop_", + "_atom_site_aniso_label", + "_atom_site_aniso_U_11", + "_atom_site_aniso_U_22", + "_atom_site_aniso_U_33", + "_atom_site_aniso_U_12", + "_atom_site_aniso_U_13", + "_atom_site_aniso_U_23", + ]) + for atom in anisotropic_atoms: + u = atom["aniso"] + lines.append( + f" {atom['label']:<8s}" + f" {_format_esd(u['u11'], u['e11']):>12s}" + f" {_format_esd(u['u22'], u['e22']):>12s}" + f" {_format_esd(u['u33'], u['e33']):>12s}" + f" {_format_esd(u['u12'], u['e12']):>12s}" + f" {_format_esd(u['u13'], u['e13']):>12s}" + f" {_format_esd(u['u23'], u['e23']):>12s}" + ) + lines.append("") + + if bonds: + lines.extend([ + "loop_", + "_geom_bond_atom_site_label_1", + "_geom_bond_atom_site_label_2", + "_geom_bond_distance", + "_ccdc_geom_bond_type", + ]) + for atom_1, atom_2, distance, bond_type in bonds: + lines.append(f" {atom_1:<8s} {atom_2:<8s} {distance:8.4f} {bond_type}") + lines.append("") + + lines.append("#END") + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text("\n".join(lines), encoding="utf-8") + + return { + "refcode": refcode, + "output": str(output_path), + "n_atoms": len(atom_data), + "n_partial_occupancy": sum(1 for atom in atom_data if atom["occupancy"] < 0.999), + "n_bonds": len(bonds), + } + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--refcodes", nargs="+", required=True, help="CSD refcodes to export") + parser.add_argument("--output-dir", default="output", help="Directory for exported CIF files") + return parser.parse_args() + + +def main() -> None: + args = parse_args() + output_dir = Path(args.output_dir) + + print(f"Licence requirement: {LICENSE_REQUIREMENT}") + for refcode in args.refcodes: + output_path = output_dir / f"{refcode}.cif" + summary = export_full_cif_from_csd(refcode, output_path) + print( + f"Exported {summary['refcode']} -> {summary['output']} " + f"(atoms={summary['n_atoms']}, partial_occ={summary['n_partial_occupancy']}, bonds={summary['n_bonds']})" + ) + + +if __name__ == "__main__": + main() From 0af27567b215cf2653ae6fbdfb28670dba62b8db Mon Sep 17 00:00:00 2001 From: SchrodingersCattt Date: Mon, 29 Jun 2026 09:56:07 +0000 Subject: [PATCH 2/3] Refactor disorder export demo for Sonar --- .../demo_csd_disorder_workflow.py | 260 +++++++++++------- 1 file changed, 160 insertions(+), 100 deletions(-) diff --git a/scripts/materials/disorder_workflow_demo/demo_csd_disorder_workflow.py b/scripts/materials/disorder_workflow_demo/demo_csd_disorder_workflow.py index 8c6922d..74a9a8b 100644 --- a/scripts/materials/disorder_workflow_demo/demo_csd_disorder_workflow.py +++ b/scripts/materials/disorder_workflow_demo/demo_csd_disorder_workflow.py @@ -42,6 +42,10 @@ LICENSE_REQUIREMENT = "CSD-Materials or another licence tier with CSD Python API database access" +def _escape_cif_string(value: str | None) -> str: + return (value or "?").replace("'", "''") + + def _format_float(value: Any, precision: int = 4) -> str: if value is None: return "?" @@ -62,63 +66,64 @@ def _format_esd(value: Any, esd: Any, precision: int = 4) -> str: return base -def export_full_cif_from_csd(refcode: str, output_path: Path) -> dict[str, Any]: - """Export a CSD structure to CIF with occupancy and disorder metadata.""" - from ccdc.io import EntryReader +def _build_disorder_map(crystal: Any) -> dict[str, tuple[int | None, str | None]]: + disorder_map: dict[str, tuple[int | None, str | None]] = {} + disorder = getattr(crystal, "disorder", None) + if disorder is None: + return disorder_map - with EntryReader("CSD") as reader: - entry = reader.entry(refcode) - crystal = reader.crystal(refcode) + try: + for assembly in disorder.assemblies: + assembly_id = str(getattr(assembly, "id", "")) + for group in assembly.groups: + group_id = int(getattr(group, "id", 0)) + for atom in group.atoms: + disorder_map[str(atom.label)] = (group_id, assembly_id) + except Exception: + return disorder_map + return disorder_map - molecule = crystal.disordered_molecule or crystal.molecule - disorder_map: dict[str, tuple[int | None, str | None]] = {} - disorder = getattr(crystal, "disorder", None) - if disorder is not None: - try: - for assembly in disorder.assemblies: - assembly_id = str(getattr(assembly, "id", "")) - for group in assembly.groups: - group_id = int(getattr(group, "id", 0)) - for atom in group.atoms: - disorder_map[str(atom.label)] = (group_id, assembly_id) - except Exception: - pass +def _extract_occupancy(atom: Any) -> float: + try: + if atom.occupancy is not None: + return float(atom.occupancy) + except Exception: + pass + return 1.0 + +def _extract_displacement(atom: Any) -> tuple[str, float | None, dict[str, Any] | None]: + displacement_type = "?" + u_iso = None + aniso = None + try: + displacement = atom.displacement_parameters + if displacement is None: + return displacement_type, u_iso, aniso + u_iso = float(displacement.isotropic_equivalent) + if displacement.type != "Anisotropic": + return "Uiso", u_iso, aniso + values = displacement.values + uncertainties = displacement.uncertainties + aniso = { + "u11": values[0][0], "u22": values[1][1], "u33": values[2][2], + "u12": values[0][1], "u13": values[0][2], "u23": values[1][2], + "e11": uncertainties[0][0], "e22": uncertainties[1][1], "e33": uncertainties[2][2], + "e12": uncertainties[0][1], "e13": uncertainties[0][2], "e23": uncertainties[1][2], + } + return "Uani", u_iso, aniso + except Exception: + return displacement_type, u_iso, aniso + + +def _extract_atom_data(molecule: Any, disorder_map: dict[str, tuple[int | None, str | None]]) -> list[dict[str, Any]]: atom_data: list[dict[str, Any]] = [] for atom in molecule.atoms: label = str(atom.label) if atom.label else "?" symbol = str(atom.atomic_symbol) if atom.atomic_symbol else "?" fractional = atom.fractional_coordinates - occupancy = 1.0 - try: - if atom.occupancy is not None: - occupancy = float(atom.occupancy) - except Exception: - pass - - displacement_type = "?" - u_iso = None - aniso = None - try: - displacement = atom.displacement_parameters - if displacement is not None: - u_iso = float(displacement.isotropic_equivalent) - if displacement.type == "Anisotropic": - displacement_type = "Uani" - values = displacement.values - uncertainties = displacement.uncertainties - aniso = { - "u11": values[0][0], "u22": values[1][1], "u33": values[2][2], - "u12": values[0][1], "u13": values[0][2], "u23": values[1][2], - "e11": uncertainties[0][0], "e22": uncertainties[1][1], "e33": uncertainties[2][2], - "e12": uncertainties[0][1], "e13": uncertainties[0][2], "e23": uncertainties[1][2], - } - else: - displacement_type = "Uiso" - except Exception: - pass - + displacement_type, u_iso, aniso = _extract_displacement(atom) disorder_group, disorder_assembly = disorder_map.get(label, (None, None)) atom_data.append({ "label": label, @@ -126,30 +131,34 @@ def export_full_cif_from_csd(refcode: str, output_path: Path) -> dict[str, Any]: "fx": float(fractional.x) if fractional else None, "fy": float(fractional.y) if fractional else None, "fz": float(fractional.z) if fractional else None, - "occupancy": occupancy, + "occupancy": _extract_occupancy(atom), "u_iso": u_iso, "displacement_type": displacement_type, "disorder_group": disorder_group, "disorder_assembly": disorder_assembly, "aniso": aniso, }) + return atom_data + +def _extract_bonds(molecule: Any) -> list[tuple[str, str, float, str]]: bonds: list[tuple[str, str, float, str]] = [] try: for bond in molecule.bonds: atom_1, atom_2 = bond.atoms bonds.append((str(atom_1.label), str(atom_2.label), float(bond.length), str(bond.bond_type))) except Exception: - pass + return bonds + return bonds + +def _build_header_lines(refcode: str, entry: Any, crystal: Any) -> list[str]: cell_a, cell_b, cell_c = crystal.cell_lengths alpha, beta, gamma = crystal.cell_angles - disorder_present = any(atom["disorder_group"] is not None for atom in atom_data) - - lines = [ + return [ f"data_{refcode}", "_audit_creation_method 'CSD Python API disorder workflow demo'", - f"_chemical_name_common '{(entry.chemical_name or '?').replace(chr(39), chr(39) + chr(39))}'", + f"_chemical_name_common '{_escape_cif_string(entry.chemical_name)}'", f"_chemical_formula_sum '{entry.formula or '?'}'", f"_cell_length_a {_format_float(cell_a)}", f"_cell_length_b {_format_float(cell_b)}", @@ -163,13 +172,20 @@ def export_full_cif_from_csd(refcode: str, output_path: Path) -> dict[str, Any]: "", ] - if crystal.symmetry_operators: - lines.extend(["loop_", "_symmetry_equiv_pos_as_xyz"]) - for operator in crystal.symmetry_operators: - lines.append(f" '{operator}'") - lines.append("") - lines.extend([ +def _build_symmetry_lines(crystal: Any) -> list[str]: + if not crystal.symmetry_operators: + return [] + lines = ["loop_", "_symmetry_equiv_pos_as_xyz"] + for operator in crystal.symmetry_operators: + lines.append(f" '{operator}'") + lines.append("") + return lines + + +def _build_atom_site_lines(atom_data: list[dict[str, Any]]) -> list[str]: + disorder_present = any(atom["disorder_group"] is not None for atom in atom_data) + lines = [ "loop_", "_atom_site_label", "_atom_site_type_symbol", @@ -179,7 +195,7 @@ def export_full_cif_from_csd(refcode: str, output_path: Path) -> dict[str, Any]: "_atom_site_occupancy", "_atom_site_U_iso_or_equiv", "_atom_site_thermal_displace_type", - ]) + ] if disorder_present: lines.extend([ "_atom_site_disorder_assembly", @@ -200,52 +216,96 @@ def export_full_cif_from_csd(refcode: str, output_path: Path) -> dict[str, Any]: f"{atom['displacement_type']:>5s}", ] if disorder_present: - row.append(f"{str(atom['disorder_assembly']) if atom['disorder_assembly'] is not None else '.':>4s}") - row.append(f"{str(atom['disorder_group']) if atom['disorder_group'] is not None else '.':>4s}") + row.extend([ + f"{str(atom['disorder_assembly']) if atom['disorder_assembly'] is not None else '.':>4s}", + f"{str(atom['disorder_group']) if atom['disorder_group'] is not None else '.':>4s}", + ]) lines.append(" " + " ".join(row)) lines.append("") + return lines + +def _build_anisotropic_lines(atom_data: list[dict[str, Any]]) -> list[str]: anisotropic_atoms = [atom for atom in atom_data if atom["aniso"]] - if anisotropic_atoms: - lines.extend([ - "loop_", - "_atom_site_aniso_label", - "_atom_site_aniso_U_11", - "_atom_site_aniso_U_22", - "_atom_site_aniso_U_33", - "_atom_site_aniso_U_12", - "_atom_site_aniso_U_13", - "_atom_site_aniso_U_23", - ]) - for atom in anisotropic_atoms: - u = atom["aniso"] - lines.append( - f" {atom['label']:<8s}" - f" {_format_esd(u['u11'], u['e11']):>12s}" - f" {_format_esd(u['u22'], u['e22']):>12s}" - f" {_format_esd(u['u33'], u['e33']):>12s}" - f" {_format_esd(u['u12'], u['e12']):>12s}" - f" {_format_esd(u['u13'], u['e13']):>12s}" - f" {_format_esd(u['u23'], u['e23']):>12s}" - ) - lines.append("") - - if bonds: - lines.extend([ - "loop_", - "_geom_bond_atom_site_label_1", - "_geom_bond_atom_site_label_2", - "_geom_bond_distance", - "_ccdc_geom_bond_type", - ]) - for atom_1, atom_2, distance, bond_type in bonds: - lines.append(f" {atom_1:<8s} {atom_2:<8s} {distance:8.4f} {bond_type}") - lines.append("") + if not anisotropic_atoms: + return [] + lines = [ + "loop_", + "_atom_site_aniso_label", + "_atom_site_aniso_U_11", + "_atom_site_aniso_U_22", + "_atom_site_aniso_U_33", + "_atom_site_aniso_U_12", + "_atom_site_aniso_U_13", + "_atom_site_aniso_U_23", + ] + for atom in anisotropic_atoms: + u = atom["aniso"] + lines.append( + f" {atom['label']:<8s}" + f" {_format_esd(u['u11'], u['e11']):>12s}" + f" {_format_esd(u['u22'], u['e22']):>12s}" + f" {_format_esd(u['u33'], u['e33']):>12s}" + f" {_format_esd(u['u12'], u['e12']):>12s}" + f" {_format_esd(u['u13'], u['e13']):>12s}" + f" {_format_esd(u['u23'], u['e23']):>12s}" + ) + lines.append("") + return lines - lines.append("#END") + +def _build_bond_lines(bonds: list[tuple[str, str, float, str]]) -> list[str]: + if not bonds: + return [] + lines = [ + "loop_", + "_geom_bond_atom_site_label_1", + "_geom_bond_atom_site_label_2", + "_geom_bond_distance", + "_ccdc_geom_bond_type", + ] + for atom_1, atom_2, distance, bond_type in bonds: + lines.append(f" {atom_1:<8s} {atom_2:<8s} {distance:8.4f} {bond_type}") + lines.append("") + return lines + + +def _write_cif_file(output_path: Path, lines: list[str]) -> None: output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text("\n".join(lines), encoding="utf-8") + +def _validated_output_path(output_dir: Path, refcode: str) -> Path: + base_dir = output_dir.expanduser().resolve() + candidate = (base_dir / f"{refcode}.cif").resolve() + try: + candidate.relative_to(base_dir) + except ValueError as exc: + raise ValueError(f"Refusing to write outside output directory: {candidate}") from exc + return candidate + + +def export_full_cif_from_csd(refcode: str, output_path: Path) -> dict[str, Any]: + """Export a CSD structure to CIF with occupancy and disorder metadata.""" + from ccdc.io import EntryReader + + with EntryReader("CSD") as reader: + entry = reader.entry(refcode) + crystal = reader.crystal(refcode) + + molecule = crystal.disordered_molecule or crystal.molecule + disorder_map = _build_disorder_map(crystal) + atom_data = _extract_atom_data(molecule, disorder_map) + bonds = _extract_bonds(molecule) + lines = [] + lines.extend(_build_header_lines(refcode, entry, crystal)) + lines.extend(_build_symmetry_lines(crystal)) + lines.extend(_build_atom_site_lines(atom_data)) + lines.extend(_build_anisotropic_lines(atom_data)) + lines.extend(_build_bond_lines(bonds)) + lines.append("#END") + _write_cif_file(output_path, lines) + return { "refcode": refcode, "output": str(output_path), @@ -268,7 +328,7 @@ def main() -> None: print(f"Licence requirement: {LICENSE_REQUIREMENT}") for refcode in args.refcodes: - output_path = output_dir / f"{refcode}.cif" + output_path = _validated_output_path(output_dir, refcode) summary = export_full_cif_from_csd(refcode, output_path) print( f"Exported {summary['refcode']} -> {summary['output']} " From 6f8f1b5b038ad1673324d3df60c435903d90a66e Mon Sep 17 00:00:00 2001 From: SchrodingersCattt Date: Mon, 29 Jun 2026 10:10:23 +0000 Subject: [PATCH 3/3] Sanitize CLI inputs to address S8707 path injection rule - Validate refcode with strict alphanumeric regex before use as filename - Validate output-dir: reject path traversal (..), enforce cwd-relative - Break taint chain: CLI args no longer flow directly to filesystem ops --- .../demo_csd_disorder_workflow.py | 50 +++++++++++++------ 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/scripts/materials/disorder_workflow_demo/demo_csd_disorder_workflow.py b/scripts/materials/disorder_workflow_demo/demo_csd_disorder_workflow.py index 74a9a8b..e610e14 100644 --- a/scripts/materials/disorder_workflow_demo/demo_csd_disorder_workflow.py +++ b/scripts/materials/disorder_workflow_demo/demo_csd_disorder_workflow.py @@ -35,6 +35,7 @@ from __future__ import annotations import argparse +import re from pathlib import Path from typing import Any @@ -275,23 +276,45 @@ def _write_cif_file(output_path: Path, lines: list[str]) -> None: output_path.write_text("\n".join(lines), encoding="utf-8") -def _validated_output_path(output_dir: Path, refcode: str) -> Path: - base_dir = output_dir.expanduser().resolve() - candidate = (base_dir / f"{refcode}.cif").resolve() - try: - candidate.relative_to(base_dir) - except ValueError as exc: - raise ValueError(f"Refusing to write outside output directory: {candidate}") from exc - return candidate +_SAFE_REFCODE_RE = re.compile(r"^[A-Za-z][A-Za-z0-9]{1,19}$") -def export_full_cif_from_csd(refcode: str, output_path: Path) -> dict[str, Any]: +def _sanitize_refcode(refcode: str) -> str: + """Ensure refcode contains only safe alphanumeric characters.""" + if not _SAFE_REFCODE_RE.match(refcode): + raise ValueError( + f"Invalid refcode {refcode!r}: must be 2-20 alphanumeric characters starting with a letter" + ) + return refcode + + +def _safe_output_dir(raw_path: str) -> Path: + """Resolve and validate output directory, rejecting path traversal.""" + # Only allow relative paths under the current working directory + if raw_path != Path(raw_path).as_posix().replace("\\", "/"): + cleaned = Path(raw_path).as_posix() + else: + cleaned = raw_path + if ".." in cleaned.split("/"): + raise ValueError(f"Path traversal not allowed in output directory: {raw_path!r}") + resolved = Path.cwd() / cleaned + resolved = resolved.resolve() + cwd = Path.cwd().resolve() + if not str(resolved).startswith(str(cwd)): + raise ValueError(f"Output directory must be under working directory: {resolved}") + return resolved + + +def export_full_cif_from_csd(refcode: str, output_dir: Path) -> dict[str, Any]: """Export a CSD structure to CIF with occupancy and disorder metadata.""" from ccdc.io import EntryReader + safe_name = _sanitize_refcode(refcode) + output_path = output_dir / f"{safe_name}.cif" + with EntryReader("CSD") as reader: - entry = reader.entry(refcode) - crystal = reader.crystal(refcode) + entry = reader.entry(safe_name) + crystal = reader.crystal(safe_name) molecule = crystal.disordered_molecule or crystal.molecule disorder_map = _build_disorder_map(crystal) @@ -324,12 +347,11 @@ def parse_args() -> argparse.Namespace: def main() -> None: args = parse_args() - output_dir = Path(args.output_dir) + output_dir = _safe_output_dir(args.output_dir) print(f"Licence requirement: {LICENSE_REQUIREMENT}") for refcode in args.refcodes: - output_path = _validated_output_path(output_dir, refcode) - summary = export_full_cif_from_csd(refcode, output_path) + summary = export_full_cif_from_csd(refcode, output_dir) print( f"Exported {summary['refcode']} -> {summary['output']} " f"(atoms={summary['n_atoms']}, partial_occ={summary['n_partial_occupancy']}, bonds={summary['n_bonds']})"